mirror of https://github.com/citusdata/citus.git
Annotated tests for multi_mx_call.
Co-authored-by: pykello <hadi.moshayedi@microsoft.com>pull/3026/head
parent
e269d990c9
commit
90e1f1442a
|
@ -81,27 +81,27 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
|
||||
if (IsMultiStatementTransaction())
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("cannot push down CALL in multi-statement transaction")));
|
||||
ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction")));
|
||||
return false;
|
||||
}
|
||||
|
||||
colocatedRelationId = ColocatedTableId(procedure->colocationId);
|
||||
if (colocatedRelationId == InvalidOid)
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("stored procedure does not have co-located tables")));
|
||||
ereport(DEBUG1, (errmsg("stored procedure does not have co-located tables")));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (procedure->distributionArgIndex < 0 ||
|
||||
procedure->distributionArgIndex >= list_length(funcExpr->args))
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("cannot push down invalid distribution_argument_index")));
|
||||
ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index")));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (contain_volatile_functions((Node *) funcExpr->args))
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("arguments in a distributed stored procedure must "
|
||||
ereport(DEBUG1, (errmsg("arguments in a distributed stored procedure must "
|
||||
"be constant expressions")));
|
||||
return false;
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
|
||||
if (!IsA(partitionValue, Const))
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("distribution argument value must be a constant")));
|
||||
ereport(DEBUG1, (errmsg("distribution argument value must be a constant")));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
shardInterval = FindShardInterval(partitionValueDatum, distTable);
|
||||
if (shardInterval == NULL)
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval")));
|
||||
ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval")));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -154,10 +154,12 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
|
||||
if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced)
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("there is no worker node with metadata")));
|
||||
ereport(DEBUG1, (errmsg("there is no worker node with metadata")));
|
||||
return false;
|
||||
}
|
||||
|
||||
ereport(DEBUG1, (errmsg("pushing down the procedure")));
|
||||
|
||||
{
|
||||
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
||||
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
|
||||
|
|
|
@ -1,172 +1,405 @@
|
|||
-- Test passing off CALL to mx workers
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
CREATE TABLE mx_call_dist_table(id int);
|
||||
select create_distributed_table('mx_call_dist_table', 'id');
|
||||
set citus.shard_replication_factor to 2;
|
||||
set citus.replication_model to 'statement';
|
||||
-- This table requires specific settings, create before getting into things
|
||||
create table mx_call_dist_table_replica(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_replica', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table values (1),(2),(3),(4),(5);
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$
|
||||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
create table mx_call_dist_table_1(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_1', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
|
||||
create table mx_call_dist_table_2(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_2', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
|
||||
create table mx_call_dist_table_ref(id int, val int);
|
||||
select create_reference_table('mx_call_dist_table_ref');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
create table mx_call_dist_table_enum(id int, key mx_call_enum);
|
||||
select create_distributed_table('mx_call_dist_table_enum', 'key');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
|
||||
-- we make sure the procedure is being executed in the worker.
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
y := y + (select sum(id) from mx_call_dist_table);
|
||||
END;
|
||||
$$;
|
||||
CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$
|
||||
-- we also make sure that we can run distributed queries in the procedures
|
||||
-- that are routed to the workers.
|
||||
y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id);
|
||||
END;$$;
|
||||
-- create another procedure which verifies:
|
||||
-- 1. we work fine with multiple return columns
|
||||
-- 2. we work fine in combination with custom types
|
||||
CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x;
|
||||
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
|
||||
END;
|
||||
$$;
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
call mx_call_proc(2, 0);
|
||||
END;$$;
|
||||
-- Test that undistributed procedures have no issue executing
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
y
|
||||
----
|
||||
18
|
||||
29
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
x | y
|
||||
---+---
|
||||
F | S
|
||||
(1 row)
|
||||
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)');
|
||||
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, 0);
|
||||
-- We still don't route them to the workers, because they aren't
|
||||
-- colocated with any distributed tables.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: stored procedure does not have co-located tables
|
||||
DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
17
|
||||
29
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
DEBUG: stored procedure does not have co-located tables
|
||||
x | y
|
||||
---+---
|
||||
F | S
|
||||
(1 row)
|
||||
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: pushing down the procedure
|
||||
y
|
||||
----
|
||||
28
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
DEBUG: pushing down the procedure
|
||||
x | y
|
||||
---+---
|
||||
S | S
|
||||
(1 row)
|
||||
|
||||
set client_min_messages to DEBUG2;
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
select sum(id) from mx_call_dist_table;
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
sum
|
||||
-----
|
||||
15
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL in multi-statement transaction
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: generating subplan 5_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Creating router plan
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan is router executable
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
18
|
||||
29
|
||||
(1 row)
|
||||
|
||||
commit;
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, 0);
|
||||
-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't
|
||||
-- be routed to workers anymore.
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_enum;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
DEBUG: stored procedure does not have co-located tables
|
||||
x | y
|
||||
---+---
|
||||
F | S
|
||||
(1 row)
|
||||
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down invalid distribution_argument_index
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: generating subplan 7_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Creating router plan
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan is router executable
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
18
|
||||
29
|
||||
(1 row)
|
||||
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, mx_call_add(3, 4));
|
||||
DEBUG: cannot push down non-constant argument value
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: generating subplan 9_1 for subquery SELECT sum(id) AS sum FROM public.mx_call_dist_table
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan 9 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Creating router plan
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
DEBUG: Plan is router executable
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(id) from mx_call_dist_table)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 4 at assignment
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down invalid distribution_argument_index
|
||||
DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
18
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL for reference tables
|
||||
DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL for replicated distributed tables
|
||||
DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
-- before distribution ...
|
||||
CALL multi_mx_call.mx_call_proc_tx(10);
|
||||
-- after distribution ...
|
||||
select create_distributed_function('mx_call_proc_tx(int)');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0);
|
||||
CALL multi_mx_call.mx_call_proc_tx(20);
|
||||
DEBUG: pushing down the procedure
|
||||
ERROR: relation "mx_call_dist_table_1" does not exist
|
||||
CONTEXT: while executing command on localhost:57637
|
||||
PL/pgSQL function multi_mx_call.mx_call_proc_tx(integer) line 3 at SQL statement
|
||||
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
|
||||
id | val
|
||||
----+-----
|
||||
3 | 1
|
||||
3 | 5
|
||||
4 | 5
|
||||
6 | 5
|
||||
9 | 2
|
||||
10 | -2
|
||||
11 | 3
|
||||
(7 rows)
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'warning';
|
||||
RAISE EXCEPTION 'error';
|
||||
END;$$;
|
||||
select create_distributed_function('mx_call_proc_raise(int)');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0);
|
||||
call multi_mx_call.mx_call_proc_raise(2);
|
||||
DEBUG: pushing down the procedure
|
||||
DEBUG: warning
|
||||
DETAIL: WARNING from localhost:57638
|
||||
ERROR: error
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
PL/pgSQL function multi_mx_call.mx_call_proc_raise(integer) line 4 at RAISE
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: there is no worker node with metadata
|
||||
DEBUG: generating subplan 25_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
--
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
|
||||
SELECT create_distributed_function('mx_call_add(int,int)');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- non-const distribution parameters cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
|
||||
DEBUG: distribution argument value must be a constant
|
||||
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- non-const parameter can be pushed down
|
||||
call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2);
|
||||
DEBUG: pushing down the procedure
|
||||
y
|
||||
----
|
||||
33
|
||||
(1 row)
|
||||
|
||||
-- volatile parameter cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
|
||||
DEBUG: arguments in a distributed stored procedure must be constant expressions
|
||||
DEBUG: generating subplan 29_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
27
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- clean-up
|
||||
--
|
||||
reset client_min_messages;
|
||||
DROP TABLE mx_call_dist_table;
|
||||
DROP PROCEDURE mx_call_proc;
|
||||
reset citus.shard_replication_factor;
|
||||
reset citus.replication_model;
|
||||
reset search_path;
|
||||
\set VERBOSITY terse
|
||||
drop schema multi_mx_call cascade;
|
||||
NOTICE: drop cascades to 10 other objects
|
||||
\set VERBOSITY default
|
||||
|
|
|
@ -1,106 +1,332 @@
|
|||
-- Test passing off CALL to mx workers
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
CREATE TABLE mx_call_dist_table(id int);
|
||||
select create_distributed_table('mx_call_dist_table', 'id');
|
||||
set citus.shard_replication_factor to 2;
|
||||
set citus.replication_model to 'statement';
|
||||
-- This table requires specific settings, create before getting into things
|
||||
create table mx_call_dist_table_replica(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_replica', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table values (1),(2),(3),(4),(5);
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$
|
||||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
y := y + (select sum(id) from mx_call_dist_table);
|
||||
END;
|
||||
$$;
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE p...
|
||||
LINE 1: CREATE PROCEDURE colocate_proc_with_table(procname text, tab...
|
||||
^
|
||||
CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
create table mx_call_dist_table_1(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_1', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
|
||||
create table mx_call_dist_table_2(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_2', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
|
||||
create table mx_call_dist_table_ref(id int, val int);
|
||||
select create_reference_table('mx_call_dist_table_ref');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
create table mx_call_dist_table_enum(id int, key mx_call_enum);
|
||||
select create_distributed_table('mx_call_dist_table_enum', 'key');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
|
||||
-- we make sure the procedure is being executed in the worker.
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
-- we also make sure that we can run distributed queries in the procedures
|
||||
-- that are routed to the workers.
|
||||
y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id);
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc(x int, INOUT y int)
|
||||
^
|
||||
-- create another procedure which verifies:
|
||||
-- 1. we work fine with multiple return columns
|
||||
-- 2. we work fine in combination with custom types
|
||||
CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x;
|
||||
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
|
||||
END;
|
||||
$$;
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INO...
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_e...
|
||||
^
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
call mx_call_proc(2, 0);
|
||||
-- Test that undistributed procedures have no issue executing
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc_asdf('S', 'A');
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
ERROR: function "mx_call_proc(int,int)" does not exist
|
||||
LINE 1: select create_distributed_function('mx_call_proc(int,int)');
|
||||
^
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)');
|
||||
ERROR: function "mx_call_proc_asdf(mx_call_enum,mx_call_enum)" does not exist
|
||||
LINE 1: select create_distributed_function('mx_call_proc_asdf(mx_cal...
|
||||
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
|
||||
ERROR: function "mx_call_proc_custom_types(mx_call_enum,mx_call_enum)" does not exist
|
||||
LINE 1: select create_distributed_function('mx_call_proc_custom_type...
|
||||
^
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, 0);
|
||||
-- We still don't route them to the workers, because they aren't
|
||||
-- colocated with any distributed tables.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc_asdf('S', 'A');
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
set client_min_messages to DEBUG2;
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_cu...
|
||||
^
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
select sum(id) from mx_call_dist_table;
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
sum
|
||||
-----
|
||||
15
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
commit;
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, 0);
|
||||
-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't
|
||||
-- be routed to workers anymore.
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_enum;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, mx_call_add(3, 4));
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, mx_call_add(3, 4));
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
reset client_min_messages;
|
||||
DROP TABLE mx_call_dist_table;
|
||||
DROP PROCEDURE mx_call_proc;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: DROP PROCEDURE mx_call_proc;
|
||||
^
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS ...
|
||||
^
|
||||
-- before distribution ...
|
||||
CALL multi_mx_call.mx_call_proc_tx(10);
|
||||
ERROR: syntax error at or near "CALL"
|
||||
LINE 1: CALL multi_mx_call.mx_call_proc_tx(10);
|
||||
^
|
||||
-- after distribution ...
|
||||
select create_distributed_function('mx_call_proc_tx(int)');
|
||||
ERROR: function "mx_call_proc_tx(int)" does not exist
|
||||
LINE 1: select create_distributed_function('mx_call_proc_tx(int)');
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx...
|
||||
^
|
||||
CALL multi_mx_call.mx_call_proc_tx(20);
|
||||
ERROR: syntax error at or near "CALL"
|
||||
LINE 1: CALL multi_mx_call.mx_call_proc_tx(20);
|
||||
^
|
||||
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
|
||||
id | val
|
||||
----+-----
|
||||
3 | 1
|
||||
3 | 5
|
||||
4 | 5
|
||||
6 | 5
|
||||
9 | 2
|
||||
(5 rows)
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'warning';
|
||||
RAISE EXCEPTION 'error';
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql ...
|
||||
^
|
||||
select create_distributed_function('mx_call_proc_raise(int)');
|
||||
ERROR: function "mx_call_proc_raise(int)" does not exist
|
||||
LINE 1: select create_distributed_function('mx_call_proc_raise(int)'...
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_ra...
|
||||
^
|
||||
call multi_mx_call.mx_call_proc_raise(2);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc_raise(2);
|
||||
^
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
--
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
|
||||
SELECT create_distributed_function('mx_call_add(int,int)');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- non-const distribution parameters cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
|
||||
^
|
||||
-- non-const parameter can be pushed down
|
||||
call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3,...
|
||||
^
|
||||
-- volatile parameter cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(floor(random())::int, 2);
|
||||
^
|
||||
--
|
||||
-- clean-up
|
||||
--
|
||||
reset client_min_messages;
|
||||
reset citus.shard_replication_factor;
|
||||
reset citus.replication_model;
|
||||
reset search_path;
|
||||
\set VERBOSITY terse
|
||||
drop schema multi_mx_call cascade;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
\set VERBOSITY default
|
||||
|
|
|
@ -2,73 +2,195 @@
|
|||
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
|
||||
set citus.shard_replication_factor to 2;
|
||||
set citus.replication_model to 'statement';
|
||||
|
||||
-- This table requires specific settings, create before getting into things
|
||||
create table mx_call_dist_table_replica(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_replica', 'id');
|
||||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
|
||||
CREATE TABLE mx_call_dist_table(id int);
|
||||
select create_distributed_table('mx_call_dist_table', 'id');
|
||||
insert into mx_call_dist_table values (1),(2),(3),(4),(5);
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
|
||||
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
create table mx_call_dist_table_1(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_1', 'id');
|
||||
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
|
||||
|
||||
create table mx_call_dist_table_2(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_2', 'id');
|
||||
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
|
||||
|
||||
create table mx_call_dist_table_ref(id int, val int);
|
||||
select create_reference_table('mx_call_dist_table_ref');
|
||||
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
|
||||
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
create table mx_call_dist_table_enum(id int, key mx_call_enum);
|
||||
select create_distributed_table('mx_call_dist_table_enum', 'key');
|
||||
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
|
||||
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$
|
||||
|
||||
CREATE PROCEDURE mx_call_proc(x int, INOUT y int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
|
||||
-- we make sure the procedure is being executed in the worker.
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
y := y + (select sum(id) from mx_call_dist_table);
|
||||
END;
|
||||
$$;
|
||||
-- we also make sure that we can run distributed queries in the procedures
|
||||
-- that are routed to the workers.
|
||||
y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id);
|
||||
END;$$;
|
||||
|
||||
CREATE PROCEDURE mx_call_proc_asdf(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$
|
||||
-- create another procedure which verifies:
|
||||
-- 1. we work fine with multiple return columns
|
||||
-- 2. we work fine in combination with custom types
|
||||
CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x;
|
||||
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
|
||||
END;
|
||||
$$;
|
||||
END;$$;
|
||||
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
call mx_call_proc(2, 0);
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
-- Test that undistributed procedures have no issue executing
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
|
||||
|
||||
select create_distributed_function('mx_call_proc_asdf(mx_call_enum,mx_call_enum)');
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc_asdf' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
-- We still don't route them to the workers, because they aren't
|
||||
-- colocated with any distributed tables.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
call mx_call_proc(2, 0);
|
||||
call mx_call_proc_asdf('S', 'A');
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
set client_min_messages to DEBUG2;
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
select sum(id) from mx_call_dist_table;
|
||||
call mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
commit;
|
||||
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = -1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
call mx_call_proc(2, 0);
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = 1, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = 'mx_call_proc' and oid = objid and pg_dist_partition.logicalrelid = 'mx_call_dist_table'::regclass;
|
||||
-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't
|
||||
-- be routed to workers anymore.
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_enum;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
call mx_call_proc(2, mx_call_add(3, 4));
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
|
||||
-- before distribution ...
|
||||
CALL multi_mx_call.mx_call_proc_tx(10);
|
||||
-- after distribution ...
|
||||
select create_distributed_function('mx_call_proc_tx(int)');
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_tx', 'mx_call_dist_table_1'::regclass, 0);
|
||||
CALL multi_mx_call.mx_call_proc_tx(20);
|
||||
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'warning';
|
||||
RAISE EXCEPTION 'error';
|
||||
END;$$;
|
||||
select create_distributed_function('mx_call_proc_raise(int)');
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_raise', 'mx_call_dist_table_1'::regclass, 0);
|
||||
call multi_mx_call.mx_call_proc_raise(2);
|
||||
|
||||
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
--
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
|
||||
SELECT create_distributed_function('mx_call_add(int,int)');
|
||||
|
||||
-- non-const distribution parameters cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
|
||||
|
||||
-- non-const parameter can be pushed down
|
||||
call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2);
|
||||
|
||||
-- volatile parameter cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
|
||||
|
||||
--
|
||||
-- clean-up
|
||||
--
|
||||
reset client_min_messages;
|
||||
DROP TABLE mx_call_dist_table;
|
||||
DROP PROCEDURE mx_call_proc;
|
||||
reset citus.shard_replication_factor;
|
||||
reset citus.replication_model;
|
||||
reset search_path;
|
||||
|
||||
\set VERBOSITY terse
|
||||
drop schema multi_mx_call cascade;
|
||||
\set VERBOSITY default
|
||||
|
||||
|
|
Loading…
Reference in New Issue