From f3545b92ef72b1e57cc05415f188f0a2f353f9df Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Mon, 30 Sep 2019 17:22:24 +0300 Subject: [PATCH] Add PG12 test outputs --- .../multi_mx_function_call_delegation.out | 4 +- .../multi_mx_function_call_delegation_0.out | 539 ++++++++++++++++++ .../multi_unsupported_worker_operations_0.out | 367 ++++++++++++ 3 files changed, 908 insertions(+), 2 deletions(-) create mode 100644 src/test/regress/expected/multi_mx_function_call_delegation_0.out create mode 100644 src/test/regress/expected/multi_unsupported_worker_operations_0.out diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 27ff2eb43..0a36c5f7b 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -369,8 +369,8 @@ DEBUG: pushing down the function call DEBUG: warning DETAIL: WARNING from localhost:57638 ERROR: error -CONTEXT: while executing command on localhost:57638 -PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE +CONTEXT: PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE +while executing command on localhost:57638 -- Don't push-down when doing INSERT INTO ... SELECT func(); SET client_min_messages TO ERROR; CREATE TABLE test (x int primary key); diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out new file mode 100644 index 000000000..5c54674c3 --- /dev/null +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -0,0 +1,539 @@ +-- Test passing off function call to mx workers +CREATE SCHEMA multi_mx_function_call_delegation; +SET search_path TO multi_mx_function_call_delegation, public; +SET citus.shard_replication_factor TO 2; +SET citus.replication_model TO 'statement'; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +-- +-- Create tables and functions we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +------------------------ + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +CREATE FUNCTION squares(int) RETURNS SETOF RECORD + AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$ + LANGUAGE SQL; +CREATE FUNCTION mx_call_func(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the function is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the functions + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +-- create another function which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ +BEGIN + y := x; + x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); +END;$$; +-- Test that undistributed functions have no issue executing +select multi_mx_function_call_delegation.mx_call_func(2, 0); + mx_call_func +-------------- + 29 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +select squares(4); + squares +--------- + (1,1) + (2,4) + (3,9) + (4,16) +(4 rows) + +-- Same for unqualified name +select mx_call_func(2, 0); + mx_call_func +-------------- + 29 +(1 row) + +-- Mark both functions as distributed ... +select create_distributed_function('mx_call_func(int,int)'); + create_distributed_function +----------------------------- + +(1 row) + +select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); + create_distributed_function +----------------------------- + +(1 row) + +select create_distributed_function('squares(int)'); + create_distributed_function +----------------------------- + +(1 row) + +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +select mx_call_func(2, 0); +DEBUG: function does not have co-located tables +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +-- Mark them as colocated with a table. Now we should route them to workers. +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------- + (S,S) +(1 row) + +select squares(4); +DEBUG: pushing down the function call +ERROR: input of anonymous composite types is not implemented +select multi_mx_function_call_delegation.mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------- + (S,S) +(1 row) + +-- We don't allow distributing calls inside transactions +begin; +select mx_call_func(2, 0); +DEBUG: not pushing down function calls in a multi-statement transaction +DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +commit; +-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: function call does not have a distribution argument +DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: function call does not have a distribution argument +DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +-- We don't currently support colocating with reference tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannnot push down function call for reference tables +DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannot push down function call for replicated distributed tables +DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +-- Test table returning functions. +CREATE FUNCTION mx_call_func_tbl(x int) +RETURNS TABLE (p0 int, p1 int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + RETURN QUERY + SELECT id, val + FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t + WHERE id >= x + ORDER BY 1, 2; +END;$$; +-- before distribution ... +select mx_call_func_tbl(10); + mx_call_func_tbl +------------------ + (10,-1) + (11,4) +(2 rows) + +-- after distribution ... +select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +select mx_call_func_tbl(20); +DEBUG: pushing down the function call + mx_call_func_tbl +------------------ + (20,-1) + (21,4) +(2 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE FUNCTION mx_call_func_raise(x int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +select mx_call_func_raise(2); +DEBUG: pushing down the function call +DEBUG: warning +DETAIL: WARNING from localhost:57638 +ERROR: error +CONTEXT: while executing command on localhost:57638 +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: the worker node does not have metadata +DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_function_call_delegation, public; +SET client_min_messages TO DEBUG1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)', '$1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +-- subquery parameters cannot be pushed down +select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); +DEBUG: arguments in a distributed function must not contain subqueries +DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 35 +(1 row) + +-- volatile parameter cannot be pushed down +select mx_call_func(floor(random())::int, 2); +DEBUG: arguments in a distributed function must be constant expressions +DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 27 +(1 row) + +-- test forms we don't distribute +select * from mx_call_func(2, 0); +DEBUG: generating subplan 5_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +select mx_call_func(2, 0) from mx_call_dist_table_1; + mx_call_func +-------------- + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 +(9 rows) + +select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; +DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- +(0 rows) + +select mx_call_func(2, 0), mx_call_func(0, 2); +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: generating subplan 13_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func | mx_call_func +--------------+-------------- + 29 | 27 +(1 row) + +DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; +DEBUG: not pushing down function calls in a multi-statement transaction +CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" +PL/pgSQL function inline_code_block line 1 at PERFORM +SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; + id | val +----+----- + 40 | -1 + 41 | 4 +(2 rows) + +-- Prepared statements +PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2); +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +RESET client_min_messages; +\set VERBOSITY terse +DROP SCHEMA multi_mx_function_call_delegation CASCADE; +NOTICE: drop cascades to 10 other objects diff --git a/src/test/regress/expected/multi_unsupported_worker_operations_0.out b/src/test/regress/expected/multi_unsupported_worker_operations_0.out new file mode 100644 index 000000000..622cb026e --- /dev/null +++ b/src/test/regress/expected/multi_unsupported_worker_operations_0.out @@ -0,0 +1,367 @@ +-- +-- MULTI_UNSUPPORTED_WORKER_OPERATIONS +-- +-- Tests for ensuring unsupported functions on workers error out. +SET citus.next_shard_id TO 1270000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; +-- Set the colocation id to a safe value so that +-- it is not affected by future changes to colocation id sequence +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; +-- Prepare the environment +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +SET citus.shard_count TO 5; +-- Create test tables +CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table', 'col_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table_2', 'col_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE mx_ref_table (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref_table'); + create_reference_table +------------------------ + +(1 row) + +-- Check that the created tables are colocated MX tables +SELECT logicalrelid, repmodel, colocationid +FROM pg_dist_partition +WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) +ORDER BY logicalrelid; + logicalrelid | repmodel | colocationid +--------------+----------+-------------- + mx_table | s | 150000 + mx_table_2 | s | 150000 +(2 rows) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +INSERT INTO mx_ref_table VALUES (-37, 'morbi'); +INSERT INTO mx_ref_table VALUES (-78, 'sapien'); +INSERT INTO mx_ref_table VALUES (-34, 'augue'); +SELECT * FROM mx_table ORDER BY col_1; + col_1 | col_2 | col_3 +-------+----------+------- + -37 | 'lorem' | 1 + 80 | 'dolor' | 3 + 7344 | 'sit' | 4 + 65536 | 'ipsum' | 2 + 65832 | 'amet' | 5 +(5 rows) + +-- Try commands from metadata worker +\c - - - :worker_1_port +CREATE TABLE mx_table_worker(col_1 text); +-- master_create_distributed_table +SELECT master_create_distributed_table('mx_table_worker', 'col_1', 'hash'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +-- create_distributed_table +SELECT create_distributed_table('mx_table_worker', 'col_1'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +-- create_reference_table +SELECT create_reference_table('mx_table_worker'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(*) FROM pg_dist_partition WHERE logicalrelid='mx_table_worker'::regclass; + count +------- + 0 +(1 row) + +DROP TABLE mx_table_worker; +-- master_create_worker_shards +CREATE TEMP TABLE pg_dist_shard_temp AS +SELECT * FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; +DELETE FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; +SELECT master_create_worker_shards('mx_table', 5, 1); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + count +------- + 0 +(1 row) + +INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + count +------- + 5 +(1 row) + +\c - - - :master_port +DROP TABLE mx_ref_table; +CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); +\c - - - :worker_1_port +-- DDL commands +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; + Column | Type | Modifiers +--------+---------+---------------------------------------------------------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +(3 rows) + +CREATE INDEX mx_test_index ON mx_table(col_2); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +ALTER TABLE mx_table ADD COLUMN col_4 int; +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; + Column | Type | Modifiers +--------+---------+---------------------------------------------------------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +(3 rows) + +\d mx_test_index +-- master_drop_all_shards +SELECT master_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_table'::regclass; + count +------- + 5 +(1 row) + +-- master_apply_delete_command +SELECT master_apply_delete_command('DELETE FROM mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_add_inactive_node +SELECT 1 FROM master_add_inactive_node('localhost', 5432); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + count +------- + 0 +(1 row) + +-- master_remove_node +\c - - - :master_port +DROP INDEX mx_test_uniq_index; +SELECT 1 FROM master_add_inactive_node('localhost', 5432); + ?column? +---------- + 1 +(1 row) + +\c - - - :worker_1_port +SELECT master_remove_node('localhost', 5432); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + count +------- + 1 +(1 row) + +\c - - - :master_port +SELECT master_remove_node('localhost', 5432); + master_remove_node +-------------------- + +(1 row) + +\c - - - :worker_1_port +-- mark_tables_colocated +UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; +SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT colocationid FROM pg_dist_partition WHERE logicalrelid='mx_table_2'::regclass; + colocationid +-------------- + 0 +(1 row) + +SELECT colocationid AS old_colocation_id +FROM pg_dist_partition +WHERE logicalrelid='mx_table'::regclass \gset +UPDATE pg_dist_partition +SET colocationid = :old_colocation_id +WHERE logicalrelid='mx_table_2'::regclass; +-- start_metadata_sync_to_node +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + f +(1 row) + +-- stop_metadata_sync_to_node +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +\c - - - :master_port +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + t +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + f +(1 row) + +\c - - - :worker_2_port +SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition; + worker_drop_distributed_table +------------------------------- + + +(2 rows) + +DELETE FROM pg_dist_node; +\c - - - :worker_1_port +-- DROP TABLE +DROP TABLE mx_table; +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +CONTEXT: SQL statement "SELECT master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 18 at PERFORM +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_drop_distributed_table_metadata +SELECT master_remove_distributed_table_metadata_from_workers('mx_table'::regclass, 'public', 'mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT master_remove_partition_metadata('mx_table'::regclass, 'public', 'mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_copy_shard_placement +SELECT logicalrelid, shardid AS testshardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port +ORDER BY shardid +LIMIT 1 \gset +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset +INSERT INTO pg_dist_placement (groupid, shardid, shardstate, shardlength) +VALUES (:worker_2_group, :testshardid, 3, 0); +SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT shardid, nodename, nodeport, shardstate +FROM pg_dist_shard_placement +WHERE shardid = :testshardid +ORDER BY nodeport; + shardid | nodename | nodeport | shardstate +---------+-----------+----------+------------ + 1270000 | localhost | 57637 | 1 + 1270000 | localhost | 57638 | 3 +(2 rows) + +DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group AND shardid = :testshardid; +-- master_get_new_placementid +SELECT master_get_new_placementid(); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +-- Show that sequences can be created and dropped on worker nodes +CREATE TABLE some_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL); +DROP TABLE some_table_with_sequence; +CREATE SEQUENCE some_sequence; +DROP SEQUENCE some_sequence; +-- Show that dropping the sequence of an MX table with cascade harms the table and shards +BEGIN; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; + Column | Type | Modifiers +--------+---------+---------------------------------------------------------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +(3 rows) + +-- suppress notice message caused by DROP ... CASCADE to prevent pg version difference +SET client_min_messages TO 'WARNING'; +DROP SEQUENCE mx_table_col_3_seq CASCADE; +RESET client_min_messages; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null +(3 rows) + +ROLLBACK; +-- Cleanup +\c - - - :master_port +DROP TABLE mx_table; +DROP TABLE mx_table_2; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +\c - - - :worker_1_port +DELETE FROM pg_dist_node; +SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition; + worker_drop_distributed_table +------------------------------- +(0 rows) + +\c - - - :master_port +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; +RESET citus.shard_replication_factor; +RESET citus.replication_model;