From 4b951a2ed96b752b54db2907b8eba0004e5490a7 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Tue, 24 Aug 2021 09:32:24 +0300 Subject: [PATCH] Add alternative output for multi-mx --- src/test/regress/bin/normalize.sed | 1 + src/test/regress/expected/multi_mx_call.out | 48 +- src/test/regress/expected/multi_mx_call_0.out | 533 +++++++++++++ .../regress/expected/multi_mx_explain.out | 42 - .../multi_mx_function_call_delegation.out | 89 ++- .../multi_mx_function_call_delegation_0.out | 724 ++++++++++++++++++ .../sql/multi_mx_function_call_delegation.sql | 5 +- 7 files changed, 1332 insertions(+), 110 deletions(-) create mode 100644 src/test/regress/expected/multi_mx_call_0.out create mode 100644 src/test/regress/expected/multi_mx_function_call_delegation_0.out diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 5ede38e35..ae4b63ac7 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -240,3 +240,4 @@ s/ERROR: ROLLBACK is not allowed in an SQL function/ERROR: ROLLBACK is not all /Parent Relationship/d /Parent-Relationship/d s/function array_cat_agg\(anycompatiblearray\)/function array_cat_agg\(anyarray\)/g +s/TRIM\(BOTH FROM value\)/btrim\(value\)/g diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 8511ed143..c493fdcd4 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -182,10 +182,10 @@ SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); DEBUG: stored procedure does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -266,10 +266,10 @@ begin; call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down CALL in multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -300,10 +300,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -319,10 +319,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -354,10 +354,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::re call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -449,10 +449,10 @@ SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); DEBUG: there is no worker node with metadata DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -495,10 +495,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); DEBUG: distribution argument value must be a constant DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -517,10 +517,10 @@ DEBUG: pushing down the procedure call multi_mx_call.mx_call_proc(floor(random())::int, 2); DEBUG: arguments in a distributed stored procedure must be constant expressions DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out new file mode 100644 index 000000000..8511ed143 --- /dev/null +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -0,0 +1,533 @@ +-- Test passing off CALL to mx workers +create schema multi_mx_call; +set search_path to multi_mx_call, public; +-- Create worker-local tables to test procedure calls were routed +set citus.shard_replication_factor to 2; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +set citus.shard_replication_factor to 1; +-- +-- Create tables and procedures we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_bigint(id bigint, val bigint); +select create_distributed_table('mx_call_dist_table_bigint', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_bigint values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +-- test that a distributed function can be colocated with a reference table +CREATE TABLE ref(groupid int); +SELECT create_reference_table('ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE PROCEDURE my_group_id_proc() +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO ref(groupid) VALUES (gid); +END; +$$; +SELECT create_distributed_function('my_group_id_proc()', colocate_with := 'ref'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 +(1 row) + +TRUNCATE TABLE ref; +-- test round robin task assignment policy uses different workers on consecutive procedure calls. +SET citus.task_assignment_policy TO 'round-robin'; +CALL my_group_id_proc(); +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 + 18 +(2 rows) + +TRUNCATE TABLE ref; +RESET citus.task_assignment_policy; +CREATE PROCEDURE mx_call_proc(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the procedure is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the procedures + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +CREATE PROCEDURE mx_call_proc_bigint(x bigint, INOUT y bigint) +LANGUAGE plpgsql AS $$ +BEGIN + y := x + y * 2; +END;$$; +-- create another procedure which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ +BEGIN + y := x; + x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); +END;$$; +-- Test that undistributed procedures have no issue executing +call multi_mx_call.mx_call_proc(2, 0); + y +--------------------------------------------------------------------- + 29 +(1 row) + +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); + x | y +--------------------------------------------------------------------- + F | S +(1 row) + +-- Same for unqualified names +call mx_call_proc(2, 0); + y +--------------------------------------------------------------------- + 29 +(1 row) + +call mx_call_proc_custom_types('S', 'A'); + x | y +--------------------------------------------------------------------- + F | S +(1 row) + +-- Mark both procedures as distributed ... +select create_distributed_function('mx_call_proc(int,int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('mx_call_proc_bigint(bigint,bigint)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: stored procedure does not have co-located tables +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +call mx_call_proc_bigint(4, 2); +DEBUG: stored procedure does not have co-located tables + y +--------------------------------------------------------------------- + 8 +(1 row) + +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: stored procedure does not have co-located tables + x | y +--------------------------------------------------------------------- + F | S +(1 row) + +-- Mark them as colocated with a table. Now we should route them to workers. +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: pushing down the procedure + y +--------------------------------------------------------------------- + 28 +(1 row) + +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: pushing down the procedure + x | y +--------------------------------------------------------------------- + S | S +(1 row) + +call mx_call_proc(2, 0); +DEBUG: pushing down the procedure + y +--------------------------------------------------------------------- + 28 +(1 row) + +call mx_call_proc_custom_types('S', 'A'); +DEBUG: pushing down the procedure + x | y +--------------------------------------------------------------------- + S | S +(1 row) + +-- Test implicit cast of int to bigint +call mx_call_proc_bigint(4, 2); +DEBUG: pushing down the procedure + y +--------------------------------------------------------------------- + 8 +(1 row) + +-- We don't allow distributing calls inside transactions +begin; +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down CALL in multi-statement transaction +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +commit; +-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: stored procedure does not have co-located tables + x | y +--------------------------------------------------------------------- + F | S +(1 row) + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down invalid distribution_argument_index +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down invalid distribution_argument_index +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +-- We support colocating with reference tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, NULL); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: will push down CALL for reference tables +DEBUG: pushing down the procedure + y +--------------------------------------------------------------------- + 28 +(1 row) + +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down function call for replicated distributed tables +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +-- Test that we handle transactional constructs correctly inside a procedure +-- that is routed to the workers. +CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + COMMIT; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + ROLLBACK; + -- Now do the final update! + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; +END;$$; +-- before distribution ... +CALL multi_mx_call.mx_call_proc_tx(10); +-- after distribution ... +select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CALL multi_mx_call.mx_call_proc_tx(20); +DEBUG: pushing down the procedure +SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; + id | val +--------------------------------------------------------------------- + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 +(9 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +\set VERBOSITY terse +call multi_mx_call.mx_call_proc_raise(2); +DEBUG: pushing down the procedure +WARNING: warning +ERROR: error +\set VERBOSITY default +-- Test that we don't propagate to non-metadata worker nodes +SET client_min_messages TO WARNING; +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: there is no worker node with metadata +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_call, public; +SET client_min_messages TO DEBUG1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- non-const distribution parameters cannot be pushed down +call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); +DEBUG: distribution argument value must be a constant +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +-- non-const parameter can be pushed down +call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2); +DEBUG: pushing down the procedure + y +--------------------------------------------------------------------- + 33 +(1 row) + +-- volatile parameter cannot be pushed down +call multi_mx_call.mx_call_proc(floor(random())::int, 2); +DEBUG: arguments in a distributed stored procedure must be constant expressions +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 27 +(1 row) + +reset client_min_messages; +\set VERBOSITY terse +drop schema multi_mx_call cascade; +NOTICE: drop cascades to 13 other objects diff --git a/src/test/regress/expected/multi_mx_explain.out b/src/test/regress/expected/multi_mx_explain.out index ab996275c..2c58dd003 100644 --- a/src/test/regress/expected/multi_mx_explain.out +++ b/src/test/regress/expected/multi_mx_explain.out @@ -88,13 +88,11 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Node Type": "Aggregate", "Strategy": "Hashed", "Partial Mode": "Simple", - "Parent Relationship": "Outer", "Parallel Aware": false, "Group Key": ["remote_scan.l_quantity"], "Plans": [ { "Node Type": "Custom Scan", - "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Adaptive", "Parallel Aware": false, "Distributed Query": { @@ -116,7 +114,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Seq Scan", - "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "lineitem_mx_1220052", "Alias": "lineitem_mx" @@ -162,7 +159,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Aggregate Hashed Simple - Outer false remote_scan.l_quantity @@ -170,7 +166,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan - Outer Citus Adaptive false @@ -194,7 +189,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan - Outer false lineitem_mx_1220052 lineitem_mx @@ -234,13 +228,11 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) - Node Type: "Aggregate" Strategy: "Hashed" Partial Mode: "Simple" - Parent Relationship: "Outer" Parallel Aware: false Group Key: - "remote_scan.l_quantity" Plans: - Node Type: "Custom Scan" - Parent Relationship: "Outer" Custom Plan Provider: "Citus Adaptive" Parallel Aware: false Distributed Query: @@ -259,7 +251,6 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) - "l_quantity" Plans: - Node Type: "Seq Scan" - Parent Relationship: "Outer" Parallel Aware: false Relation Name: "lineitem_mx_1220052" Alias: "lineitem_mx" @@ -537,7 +528,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Custom Scan", - "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Adaptive", "Parallel Aware": false, "Distributed Query": { @@ -558,7 +548,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Hash Join", - "Parent Relationship": "Outer", "Parallel Aware": false, "Join Type": "Inner", "Inner Unique": false, @@ -566,7 +555,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Hash Join", - "Parent Relationship": "Outer", "Parallel Aware": false, "Join Type": "Inner", "Inner Unique": false, @@ -574,19 +562,16 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Seq Scan", - "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "supplier_mx_1220087", "Alias": "supplier_mx" }, { "Node Type": "Hash", - "Parent Relationship": "Inner", "Parallel Aware": false, "Plans": [ { "Node Type": "Seq Scan", - "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "lineitem_mx_1220052", "Alias": "lineitem_mx" @@ -597,12 +582,10 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) }, { "Node Type": "Hash", - "Parent Relationship": "Inner", "Parallel Aware": false, "Plans": [ { "Node Type": "Hash Join", - "Parent Relationship": "Outer", "Parallel Aware": false, "Join Type": "Inner", "Inner Unique": false, @@ -610,19 +593,16 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plans": [ { "Node Type": "Seq Scan", - "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "customer_mx_1220084", "Alias": "customer_mx" }, { "Node Type": "Hash", - "Parent Relationship": "Inner", "Parallel Aware": false, "Plans": [ { "Node Type": "Seq Scan", - "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "orders_mx_1220068", "Alias": "orders_mx" @@ -673,7 +653,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan - Outer Citus Adaptive false @@ -694,7 +673,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Hash Join - Outer false Inner false @@ -702,7 +680,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Hash Join - Outer false Inner false @@ -710,19 +687,16 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan - Outer false supplier_mx_1220087 supplier_mx Hash - Inner false Seq Scan - Outer false lineitem_mx_1220052 lineitem_mx @@ -733,12 +707,10 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Hash - Inner false Hash Join - Outer false Inner false @@ -746,19 +718,16 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan - Outer false customer_mx_1220084 customer_mx Hash - Inner false Seq Scan - Outer false orders_mx_1220068 orders_mx @@ -805,7 +774,6 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Parallel Aware: false Plans: - Node Type: "Custom Scan" - Parent Relationship: "Outer" Custom Plan Provider: "Citus Adaptive" Parallel Aware: false Distributed Query: @@ -822,55 +790,45 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Parallel Aware: false Plans: - Node Type: "Hash Join" - Parent Relationship: "Outer" Parallel Aware: false Join Type: "Inner" Inner Unique: false Hash Cond: "(lineitem_mx.l_orderkey = orders_mx.o_orderkey)" Plans: - Node Type: "Hash Join" - Parent Relationship: "Outer" Parallel Aware: false Join Type: "Inner" Inner Unique: false Hash Cond: "(supplier_mx.s_suppkey = lineitem_mx.l_suppkey)" Plans: - Node Type: "Seq Scan" - Parent Relationship: "Outer" Parallel Aware: false Relation Name: "supplier_mx_1220087" Alias: "supplier_mx" - Node Type: "Hash" - Parent Relationship: "Inner" Parallel Aware: false Plans: - Node Type: "Seq Scan" - Parent Relationship: "Outer" Parallel Aware: false Relation Name: "lineitem_mx_1220052" Alias: "lineitem_mx" - Node Type: "Hash" - Parent Relationship: "Inner" Parallel Aware: false Plans: - Node Type: "Hash Join" - Parent Relationship: "Outer" Parallel Aware: false Join Type: "Inner" Inner Unique: false Hash Cond: "(customer_mx.c_custkey = orders_mx.o_custkey)" Plans: - Node Type: "Seq Scan" - Parent Relationship: "Outer" Parallel Aware: false Relation Name: "customer_mx_1220084" Alias: "customer_mx" - Node Type: "Hash" - Parent Relationship: "Inner" Parallel Aware: false Plans: - Node Type: "Seq Scan" - Parent Relationship: "Outer" Parallel Aware: false Relation Name: "orders_mx_1220068" Alias: "orders_mx" diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 817cc92a7..dd5bfdbfc 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -143,10 +143,10 @@ SET client_min_messages TO DEBUG1; select mx_call_func(2, 0); DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -230,26 +230,33 @@ DEBUG: pushing down the function call (S,S) (1 row) --- This is currently an undetected failure when using the binary protocol --- It should not be enabled by default until this is resolved. The tests above --- will fail too, when changing the default to TRUE; +-- this is fixed with pg14 and this will fail prior to +-- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + RESET citus.enable_binary_protocol; -- We don't allow distributing calls inside transactions begin; select mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -280,10 +287,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -299,10 +306,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -333,10 +340,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::re select mx_call_func(2, 0); DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -515,10 +522,10 @@ SET client_min_messages TO DEBUG1; select mx_call_func(2, 0); DEBUG: the worker node does not have metadata DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -562,10 +569,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); DEBUG: arguments in a distributed function must not contain subqueries DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -576,10 +583,10 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment select mx_call_func(floor(random())::int, 2); DEBUG: arguments in a distributed function must be constant expressions DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -589,10 +596,10 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -- test forms we don't distribute select * from mx_call_func(2, 0); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment y --------------------------------------------------------------------- @@ -615,10 +622,10 @@ select mx_call_func(2, 0) from mx_call_dist_table_1; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- @@ -626,16 +633,16 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment select mx_call_func(2, 0), mx_call_func(0, 2); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func | mx_call_func --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out new file mode 100644 index 000000000..e16ba2922 --- /dev/null +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -0,0 +1,724 @@ +-- Test passing off function call to mx workers +CREATE SCHEMA multi_mx_function_call_delegation; +SET search_path TO multi_mx_function_call_delegation, public; +SET citus.shard_replication_factor TO 2; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +SET citus.shard_replication_factor TO 1; +-- +-- Create tables and functions we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_bigint(id bigint, val bigint); +select create_distributed_table('mx_call_dist_table_bigint', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_bigint values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +CREATE FUNCTION squares(int) RETURNS SETOF RECORD + AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$ + LANGUAGE SQL; +CREATE FUNCTION mx_call_func(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the function is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the functions + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +CREATE FUNCTION mx_call_func_bigint(x bigint, INOUT y bigint) +LANGUAGE plpgsql AS $$ +BEGIN + y := x + y * 2; +END;$$; +-- create another function which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ +BEGIN + y := x; + x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); +END;$$; +-- Test that undistributed functions have no issue executing +select multi_mx_function_call_delegation.mx_call_func(2, 0); + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); + mx_call_func_custom_types +--------------------------------------------------------------------- + (F,S) +(1 row) + +select squares(4); + squares +--------------------------------------------------------------------- + (1,1) + (2,4) + (3,9) + (4,16) +(4 rows) + +-- Same for unqualified name +select mx_call_func(2, 0); + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +-- Mark both functions as distributed ... +select create_distributed_function('mx_call_func(int,int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('squares(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +select mx_call_func(2, 0); +DEBUG: function does not have co-located tables +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_bigint(4, 2); +DEBUG: function does not have co-located tables + mx_call_func_bigint +--------------------------------------------------------------------- + 8 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------------------------------------------------- + (F,S) +(1 row) + +-- Mark them as colocated with a table. Now we should route them to workers. +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +select mx_call_func_bigint(4, 2); +DEBUG: pushing down the function call + mx_call_func_bigint +--------------------------------------------------------------------- + 8 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + +select squares(4); +DEBUG: pushing down the function call +ERROR: input of anonymous composite types is not implemented +select multi_mx_function_call_delegation.mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + +-- this is fixed with pg14 and this will fail prior to +-- pg 14 +SET citus.enable_binary_protocol = TRUE; +select mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call +ERROR: wrong data type: XXXX, expected XXXX +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call +ERROR: wrong data type: XXXX, expected XXXX +RESET citus.enable_binary_protocol; +-- We don't allow distributing calls inside transactions +begin; +select mx_call_func(2, 0); +DEBUG: not pushing down function calls in a multi-statement transaction +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +commit; +-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------------------------------------------------- + (F,S) +(1 row) + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannot push down invalid distribution_argument_index +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannot push down invalid distribution_argument_index +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +-- We don't currently support colocating with reference tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannot push down function call for replicated distributed tables +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + +-- Test table returning functions. +CREATE FUNCTION mx_call_func_tbl(x int) +RETURNS TABLE (p0 int, p1 int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + RETURN QUERY + SELECT id, val + FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t + WHERE id >= x + ORDER BY 1, 2; +END;$$; +-- before distribution ... +select mx_call_func_tbl(10); + mx_call_func_tbl +--------------------------------------------------------------------- + (10,-1) + (11,4) +(2 rows) + +-- after distribution ... +select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select mx_call_func_tbl(20); +DEBUG: pushing down the function call + mx_call_func_tbl +--------------------------------------------------------------------- + (20,-1) + (21,4) +(2 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE FUNCTION mx_call_func_raise(x int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +\set VERBOSITY terse +select mx_call_func_raise(2); +DEBUG: pushing down the function call +WARNING: warning +ERROR: error +\set VERBOSITY default +-- Don't push-down when doing INSERT INTO ... SELECT func(); +SET client_min_messages TO ERROR; +CREATE TABLE test (x int primary key); +SELECT create_distributed_table('test','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION delegated_function(a int) +RETURNS int +LANGUAGE plpgsql +AS $function$ +DECLARE +BEGIN + INSERT INTO multi_mx_function_call_delegation.test VALUES (a); + INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1); + RETURN a+2; +END; +$function$; +SELECT create_distributed_function('delegated_function(int)', 'a'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT delegated_function(1); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: not pushing down function calls in INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- Don't push down in subqueries or CTEs. +SELECT * FROM test WHERE not exists( + SELECT delegated_function(4) +); +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan XXX_1 for subquery SELECT multi_mx_function_call_delegation.delegated_function(4) AS delegated_function +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (NOT (EXISTS (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)))) + x +--------------------------------------------------------------------- +(0 rows) + +WITH r AS ( + SELECT delegated_function(7) +) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r); +DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(7) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan XXX_2 for subquery SELECT (count(*) OPERATOR(pg_catalog.=) 0) FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result("?column?" boolean)) + x +--------------------------------------------------------------------- +(0 rows) + +WITH r AS ( + SELECT delegated_function(10) +), t AS ( + SELECT count(*) c FROM r +) SELECT * FROM test, t WHERE t.c=0; +DEBUG: CTE t is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(10) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) + x | c +--------------------------------------------------------------------- +(0 rows) + +WITH r AS ( + SELECT count(*) FROM test +), s AS ( + SELECT delegated_function(13) +), t AS ( + SELECT count(*) c FROM s +) SELECT * FROM test, r, t WHERE t.c=0; +DEBUG: CTE r is going to be inlined via distributed planning +DEBUG: CTE t is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE s: SELECT multi_mx_function_call_delegation.delegated_function(13) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count FROM multi_mx_function_call_delegation.test +DEBUG: generating subplan XXX_3 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) s +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, r.count, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) r, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) + x | count | c +--------------------------------------------------------------------- +(0 rows) + +-- Test that we don't propagate to non-metadata worker nodes +SET client_min_messages TO WARNING; +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +select mx_call_func(2, 0); +DEBUG: the worker node does not have metadata +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_function_call_delegation, public; +SET client_min_messages TO DEBUG1; +SET citus.shard_replication_factor = 1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)', '$1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- subquery parameters cannot be pushed down +select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); +DEBUG: arguments in a distributed function must not contain subqueries +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 35 +(1 row) + +-- volatile parameter cannot be pushed down +select mx_call_func(floor(random())::int, 2); +DEBUG: arguments in a distributed function must be constant expressions +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- + 27 +(1 row) + +-- test forms we don't distribute +select * from mx_call_func(2, 0); +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + y +--------------------------------------------------------------------- + 29 +(1 row) + +select mx_call_func(2, 0) from mx_call_dist_table_1; + mx_call_func +--------------------------------------------------------------------- + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 +(9 rows) + +select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +--------------------------------------------------------------------- +(0 rows) + +select mx_call_func(2, 0), mx_call_func(0, 2); +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func | mx_call_func +--------------------------------------------------------------------- + 29 | 27 +(1 row) + +DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; +DEBUG: not pushing down function calls in a multi-statement transaction +CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" +PL/pgSQL function inline_code_block line 1 at PERFORM +SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; + id | val +--------------------------------------------------------------------- + 40 | -1 + 41 | 4 +(2 rows) + +-- Prepared statements. Repeat 8 times to test for generic plans +PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2); +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +\c - - - :worker_1_port +SET search_path TO multi_mx_function_call_delegation, public; +-- create_distributed_function is disallowed from worker nodes +select create_distributed_function('mx_call_func(int,int)'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +\c - - - :master_port +SET search_path TO multi_mx_function_call_delegation, public; +RESET client_min_messages; +\set VERBOSITY terse +DROP SCHEMA multi_mx_function_call_delegation CASCADE; +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 4f7de1d92..513146a4e 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -103,9 +103,8 @@ select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); --- This is currently an undetected failure when using the binary protocol --- It should not be enabled by default until this is resolved. The tests above --- will fail too, when changing the default to TRUE; +-- this is fixed with pg14 and this will fail prior to +-- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');