diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 863aded67..4251de854 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -81,27 +81,27 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, if (IsMultiStatementTransaction()) { - ereport(DEBUG2, (errmsg("cannot push down CALL in multi-statement transaction"))); + ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction"))); return false; } colocatedRelationId = ColocatedTableId(procedure->colocationId); if (colocatedRelationId == InvalidOid) { - ereport(DEBUG2, (errmsg("stored procedure does not have co-located tables"))); + ereport(DEBUG1, (errmsg("stored procedure does not have co-located tables"))); return false; } if (procedure->distributionArgIndex < 0 || procedure->distributionArgIndex >= list_length(funcExpr->args)) { - ereport(DEBUG2, (errmsg("cannot push down invalid distribution_argument_index"))); + ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index"))); return false; } if (contain_volatile_functions((Node *) funcExpr->args)) { - ereport(DEBUG2, (errmsg("arguments in a distributed stored procedure must " + ereport(DEBUG1, (errmsg("arguments in a distributed stored procedure must " "be constant expressions"))); return false; } @@ -119,7 +119,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex); if (!IsA(partitionValue, Const)) { - ereport(DEBUG2, (errmsg("distribution argument value must be a constant"))); + ereport(DEBUG1, (errmsg("distribution argument value must be a constant"))); return false; } @@ -137,7 +137,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, shardInterval = FindShardInterval(partitionValueDatum, distTable); if (shardInterval == NULL) { - ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval"))); + ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval"))); return false; } @@ -154,10 +154,12 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) { - ereport(DEBUG2, (errmsg("there is no worker node with metadata"))); + ereport(DEBUG1, (errmsg("there is no worker node with metadata"))); return false; } + ereport(DEBUG1, (errmsg("pushing down the procedure"))); + { Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index a77447c69..4d4217bc4 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -1,172 +1,405 @@ -- Test passing off CALL to mx workers -- Create worker-local tables to test procedure calls were routed -set citus.shard_replication_factor to 1; -set citus.replication_model to 'streaming'; -CREATE TABLE mx_call_dist_table(id int); -select create_distributed_table('mx_call_dist_table', 'id'); +set citus.shard_replication_factor to 2; +set citus.replication_model to 'statement'; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); create_distributed_table -------------------------- (1 row) -insert into mx_call_dist_table values (1),(2),(3),(4),(5); -create type mx_call_enum as enum ('A', 'S', 'D', 'F'); -CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$ +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +set citus.shard_replication_factor to 1; +set citus.replication_model to 'streaming'; +create schema multi_mx_call; +set search_path to multi_mx_call, public; +-- +-- Utility UDFs +-- +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +LANGUAGE plpgsql AS $$ BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; +-- +-- Create tables and procedures we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +------------------------ + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +CREATE PROCEDURE mx_call_proc(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the procedure is being executed in the worker. y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); - y := y + (select sum(id) from mx_call_dist_table); -END; -$$; -CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$ + -- we also make sure that we can run distributed queries in the procedures + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +-- create another procedure which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); -END; -$$; -CREATE FUNCTION mx_call_add(int, int) RETURNS int - AS 'select $1 + $2;' - LANGUAGE SQL - IMMUTABLE - RETURNS NULL ON NULL INPUT; -call mx_call_proc(2, 0); +END;$$; +-- Test that undistributed procedures have no issue executing +call multi_mx_call.mx_call_proc(2, 0); y ---- - 18 + 29 (1 row) -call mx_call_proc_asdf('S', 'A'); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); x | y ---+--- F | S (1 row) +-- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); create_distributed_function ----------------------------- (1 row) -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); create_distributed_function ----------------------------- (1 row) -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, 0); +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: stored procedure does not have co-located tables +DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y ---- - 17 + 29 (1 row) -call mx_call_proc_asdf('S', 'A'); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: stored procedure does not have co-located tables + x | y +---+--- + F | S +(1 row) + +-- Mark them as colocated with a table. Now we should route them to workers. +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: pushing down the procedure + y +---- + 28 +(1 row) + +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: pushing down the procedure x | y ---+--- S | S (1 row) -set client_min_messages to DEBUG2; +-- We don't allow distributing calls inside transactions begin; -select sum(id) from mx_call_dist_table; -DEBUG: Router planner cannot handle multi-shard select queries - sum ------ - 15 -(1 row) - -call mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down CALL in multi-statement transaction -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: generating subplan 5_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Creating router plan -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan is router executable -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y ---- - 18 + 29 (1 row) commit; -update citus.pg_dist_object -set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, 0); +-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +DEBUG: stored procedure does not have co-located tables + x | y +---+--- + F | S +(1 row) + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); +call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down invalid distribution_argument_index -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: generating subplan 7_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Creating router plan -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan is router executable -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment +DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y ---- - 18 + 29 (1 row) -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, mx_call_add(3, 4)); -DEBUG: cannot push down non-constant argument value -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Router planner cannot handle multi-shard select queries -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: generating subplan 9_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan 9 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Creating router plan -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment -DEBUG: Plan is router executable -CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)" -PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down invalid distribution_argument_index +DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y ---- - 18 + 29 (1 row) +-- We don't currently support colocating with reference tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down CALL for reference tables +DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +-- We don't currently support colocating with replicated tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down CALL for replicated distributed tables +DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +-- Test that we handle transactional constructs correctly inside a procedure +-- that is routed to the workers. +CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + COMMIT; + UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + ROLLBACK; + -- Now do the final update! + UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; +END;$$; +-- before distribution ... +CALL multi_mx_call.mx_call_proc_tx(10); +-- after distribution ... +select create_distributed_function('mx_call_proc_tx(int)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0); +CALL multi_mx_call.mx_call_proc_tx(20); +DEBUG: pushing down the procedure +ERROR: relation "mx_call_dist_table_1" does not exist +CONTEXT: while executing command on localhost:57637 +PL/pgSQL function multi_mx_call.mx_call_proc_tx(integer) line 3 at SQL statement +SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; + id | val +----+----- + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 +(7 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_proc_raise(int)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0); +call multi_mx_call.mx_call_proc_raise(2); +DEBUG: pushing down the procedure +DEBUG: warning +DETAIL: WARNING from localhost:57638 +ERROR: error +CONTEXT: while executing command on localhost:57638 +PL/pgSQL function multi_mx_call.mx_call_proc_raise(integer) line 4 at RAISE +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: there is no worker node with metadata +DEBUG: generating subplan 25_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +-- non-const distribution parameters cannot be pushed down +call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); +DEBUG: distribution argument value must be a constant +DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +-- non-const parameter can be pushed down +call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2); +DEBUG: pushing down the procedure + y +---- + 33 +(1 row) + +-- volatile parameter cannot be pushed down +call multi_mx_call.mx_call_proc(floor(random())::int, 2); +DEBUG: arguments in a distributed stored procedure must be constant expressions +DEBUG: generating subplan 29_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 27 +(1 row) + +-- +-- clean-up +-- reset client_min_messages; -DROP TABLE mx_call_dist_table; -DROP PROCEDURE mx_call_proc; reset citus.shard_replication_factor; reset citus.replication_model; +reset search_path; +\set VERBOSITY terse +drop schema multi_mx_call cascade; +NOTICE: drop cascades to 10 other objects +\set VERBOSITY default diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 9677b4fca..6a8343d93 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -1,106 +1,332 @@ -- Test passing off CALL to mx workers -- Create worker-local tables to test procedure calls were routed -set citus.shard_replication_factor to 1; -set citus.replication_model to 'streaming'; -CREATE TABLE mx_call_dist_table(id int); -select create_distributed_table('mx_call_dist_table', 'id'); +set citus.shard_replication_factor to 2; +set citus.replication_model to 'statement'; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); create_distributed_table -------------------------- (1 row) -insert into mx_call_dist_table values (1),(2),(3),(4),(5); -create type mx_call_enum as enum ('A', 'S', 'D', 'F'); -CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$ +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +set citus.shard_replication_factor to 1; +set citus.replication_model to 'streaming'; +create schema multi_mx_call; +set search_path to multi_mx_call, public; +-- +-- Utility UDFs +-- +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +LANGUAGE plpgsql AS $$ BEGIN - y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); - y := y + (select sum(id) from mx_call_dist_table); -END; -$$; + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; ERROR: syntax error at or near "PROCEDURE" -LINE 1: CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE p... +LINE 1: CREATE PROCEDURE colocate_proc_with_table(procname text, tab... ^ -CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$ +-- +-- Create tables and procedures we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +------------------------ + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +CREATE PROCEDURE mx_call_proc(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the procedure is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the procedures + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +ERROR: syntax error at or near "PROCEDURE" +LINE 1: CREATE PROCEDURE mx_call_proc(x int, INOUT y int) + ^ +-- create another procedure which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); -END; -$$; +END;$$; ERROR: syntax error at or near "PROCEDURE" -LINE 1: CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INO... +LINE 1: CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_e... ^ -CREATE FUNCTION mx_call_add(int, int) RETURNS int - AS 'select $1 + $2;' - LANGUAGE SQL - IMMUTABLE - RETURNS NULL ON NULL INPUT; -call mx_call_proc(2, 0); +-- Test that undistributed procedures have no issue executing +call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc(2, 0); +LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ -call mx_call_proc_asdf('S', 'A'); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc_asdf('S', 'A'); +LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ +-- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); ERROR: function "mx_call_proc(int,int)" does not exist LINE 1: select create_distributed_function('mx_call_proc(int,int)'); ^ -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)'); -ERROR: function "mx_call_proc_asdf(mx_call_enum,mx_call_enum)" does not exist -LINE 1: select create_distributed_function('mx_call_proc_asdf(mx_cal... +select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); +ERROR: function "mx_call_proc_custom_types(mx_call_enum,mx_call_enum)" does not exist +LINE 1: select create_distributed_function('mx_call_proc_custom_type... ^ -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, 0); +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc(2, 0); +LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ -call mx_call_proc_asdf('S', 'A'); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc_asdf('S', 'A'); +LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ -set client_min_messages to DEBUG2; +-- Mark them as colocated with a table. Now we should route them to workers. +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... + ^ +call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_cu... + ^ +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); + ^ +-- We don't allow distributing calls inside transactions begin; -select sum(id) from mx_call_dist_table; -DEBUG: Router planner cannot handle multi-shard select queries - sum ------ - 15 -(1 row) - -call mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc(2, 0); +LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ commit; -update citus.pg_dist_object -set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, 0); +-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc(2, 0); +LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, mx_call_add(3, 4)); +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); ERROR: syntax error at or near "call" -LINE 1: call mx_call_proc(2, mx_call_add(3, 4)); +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... ^ -reset client_min_messages; -DROP TABLE mx_call_dist_table; -DROP PROCEDURE mx_call_proc; +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... + ^ +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +-- We don't currently support colocating with reference tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... + ^ +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +-- We don't currently support colocating with replicated tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... + ^ +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... + ^ +-- Test that we handle transactional constructs correctly inside a procedure +-- that is routed to the workers. +CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + COMMIT; + UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + ROLLBACK; + -- Now do the final update! + UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; +END;$$; ERROR: syntax error at or near "PROCEDURE" -LINE 1: DROP PROCEDURE mx_call_proc; - ^ +LINE 1: CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS ... + ^ +-- before distribution ... +CALL multi_mx_call.mx_call_proc_tx(10); +ERROR: syntax error at or near "CALL" +LINE 1: CALL multi_mx_call.mx_call_proc_tx(10); + ^ +-- after distribution ... +select create_distributed_function('mx_call_proc_tx(int)'); +ERROR: function "mx_call_proc_tx(int)" does not exist +LINE 1: select create_distributed_function('mx_call_proc_tx(int)'); + ^ +call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx... + ^ +CALL multi_mx_call.mx_call_proc_tx(20); +ERROR: syntax error at or near "CALL" +LINE 1: CALL multi_mx_call.mx_call_proc_tx(20); + ^ +SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; + id | val +----+----- + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 +(5 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +ERROR: syntax error at or near "PROCEDURE" +LINE 1: CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql ... + ^ +select create_distributed_function('mx_call_proc_raise(int)'); +ERROR: function "mx_call_proc_raise(int)" does not exist +LINE 1: select create_distributed_function('mx_call_proc_raise(int)'... + ^ +call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_ra... + ^ +call multi_mx_call.mx_call_proc_raise(2); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc_raise(2); + ^ +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, 0); + ^ +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +-- non-const distribution parameters cannot be pushed down +call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); + ^ +-- non-const parameter can be pushed down +call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3,... + ^ +-- volatile parameter cannot be pushed down +call multi_mx_call.mx_call_proc(floor(random())::int, 2); +ERROR: syntax error at or near "call" +LINE 1: call multi_mx_call.mx_call_proc(floor(random())::int, 2); + ^ +-- +-- clean-up +-- +reset client_min_messages; reset citus.shard_replication_factor; reset citus.replication_model; +reset search_path; +\set VERBOSITY terse +drop schema multi_mx_call cascade; +NOTICE: drop cascades to 5 other objects +\set VERBOSITY default diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index b9fdaeeab..d0c454977 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -2,73 +2,195 @@ -- Create worker-local tables to test procedure calls were routed +set citus.shard_replication_factor to 2; +set citus.replication_model to 'statement'; + +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); + set citus.shard_replication_factor to 1; set citus.replication_model to 'streaming'; -CREATE TABLE mx_call_dist_table(id int); -select create_distributed_table('mx_call_dist_table', 'id'); -insert into mx_call_dist_table values (1),(2),(3),(4),(5); +create schema multi_mx_call; +set search_path to multi_mx_call, public; + +-- +-- Utility UDFs +-- + +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +LANGUAGE plpgsql AS $$ +BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; + + +-- +-- Create tables and procedures we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); + +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); + +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); -CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$ + +CREATE PROCEDURE mx_call_proc(x int, INOUT y int) +LANGUAGE plpgsql AS $$ BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the procedure is being executed in the worker. y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); - y := y + (select sum(id) from mx_call_dist_table); -END; -$$; + -- we also make sure that we can run distributed queries in the procedures + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; -CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$ +-- create another procedure which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); -END; -$$; +END;$$; -CREATE FUNCTION mx_call_add(int, int) RETURNS int - AS 'select $1 + $2;' - LANGUAGE SQL - IMMUTABLE - RETURNS NULL ON NULL INPUT; - -call mx_call_proc(2, 0); -call mx_call_proc_asdf('S', 'A'); +-- Test that undistributed procedures have no issue executing +call multi_mx_call.mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +-- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; +select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); -select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)'); -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); -call mx_call_proc(2, 0); -call mx_call_proc_asdf('S', 'A'); +-- Mark them as colocated with a table. Now we should route them to workers. +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); -set client_min_messages to DEBUG2; +-- We don't allow distributing calls inside transactions begin; -select sum(id) from mx_call_dist_table; -call mx_call_proc(2, 0); +call multi_mx_call.mx_call_proc(2, 0); commit; -update citus.pg_dist_object -set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; -call mx_call_proc(2, 0); -update citus.pg_dist_object -set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid -from pg_proc, pg_dist_partition -where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass; +-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +call multi_mx_call.mx_call_proc_custom_types('S', 'A'); -call mx_call_proc(2, mx_call_add(3, 4)); +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); +call multi_mx_call.mx_call_proc(2, 0); +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); +call multi_mx_call.mx_call_proc(2, 0); +-- We don't currently support colocating with reference tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); + +-- We don't currently support colocating with replicated tables +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); +call multi_mx_call.mx_call_proc(2, 0); +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; + +call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + +-- Test that we handle transactional constructs correctly inside a procedure +-- that is routed to the workers. +CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + COMMIT; + UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + ROLLBACK; + -- Now do the final update! + UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; +END;$$; + +-- before distribution ... +CALL multi_mx_call.mx_call_proc_tx(10); +-- after distribution ... +select create_distributed_function('mx_call_proc_tx(int)'); +call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0); +CALL multi_mx_call.mx_call_proc_tx(20); +SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; + +-- Test that we properly propagate errors raised from procedures. +CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_proc_raise(int)'); +call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0); +call multi_mx_call.mx_call_proc_raise(2); + + +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); +select stop_metadata_sync_to_node('localhost', :worker_2_port); +call multi_mx_call.mx_call_proc(2, 0); +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); +select start_metadata_sync_to_node('localhost', :worker_2_port); +SET client_min_messages TO DEBUG1; + +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)'); + +-- non-const distribution parameters cannot be pushed down +call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); + +-- non-const parameter can be pushed down +call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2); + +-- volatile parameter cannot be pushed down +call multi_mx_call.mx_call_proc(floor(random())::int, 2); + +-- +-- clean-up +-- reset client_min_messages; -DROP TABLE mx_call_dist_table; -DROP PROCEDURE mx_call_proc; reset citus.shard_replication_factor; reset citus.replication_model; +reset search_path; + +\set VERBOSITY terse +drop schema multi_mx_call cascade; +\set VERBOSITY default +