mirror of https://github.com/citusdata/citus.git
1304 lines
62 KiB
Plaintext
1304 lines
62 KiB
Plaintext
CREATE SCHEMA function_propagation_schema;
|
|
SET search_path TO 'function_propagation_schema';
|
|
-- Check whether supported dependencies can be distributed while propagating functions
|
|
-- Check types
|
|
SET citus.enable_metadata_sync TO OFF;
|
|
CREATE TYPE function_prop_type AS (a int, b int);
|
|
RESET citus.enable_metadata_sync;
|
|
CREATE OR REPLACE FUNCTION func_1(param_1 function_prop_type)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Check all dependent objects and function depends on all nodes
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema'::regnamespace::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(schema,{function_propagation_schema},{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.function_prop_type'::regtype::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(type,{function_propagation_schema.function_prop_type},{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_1'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_1}",{function_propagation_schema.function_prop_type})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema'::regnamespace::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (schema,{function_propagation_schema},{})
|
|
localhost | 57638 | t | (schema,{function_propagation_schema},{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.function_prop_type'::regtype::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (type,{function_propagation_schema.function_prop_type},{})
|
|
localhost | 57638 | t | (type,{function_propagation_schema.function_prop_type},{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_1'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_1}",{function_propagation_schema.function_prop_type})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_1}",{function_propagation_schema.function_prop_type})
|
|
(2 rows)
|
|
|
|
SET citus.enable_metadata_sync TO OFF;
|
|
CREATE TYPE function_prop_type_2 AS (a int, b int);
|
|
RESET citus.enable_metadata_sync;
|
|
CREATE OR REPLACE FUNCTION func_2(param_1 int)
|
|
RETURNS function_prop_type_2
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.function_prop_type_2'::regtype::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(type,{function_propagation_schema.function_prop_type_2},{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_2'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_2}",{integer})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.function_prop_type_2'::regtype::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (type,{function_propagation_schema.function_prop_type_2},{})
|
|
localhost | 57638 | t | (type,{function_propagation_schema.function_prop_type_2},{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_2'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_2}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_2}",{integer})
|
|
(2 rows)
|
|
|
|
-- Have a separate check for type created in transaction
|
|
BEGIN;
|
|
SET LOCAL citus.create_object_propagation TO deferred;
|
|
CREATE TYPE function_prop_type_3 AS (a int, b int);
|
|
COMMIT;
|
|
-- Objects in the body part is not found as dependency
|
|
CREATE OR REPLACE FUNCTION func_3(param_1 int)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
DECLARE
|
|
internal_param1 function_prop_type_3;
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.function_prop_type_3'::regtype::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_3'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_3}",{integer})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_3'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_3}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_3}",{integer})
|
|
(2 rows)
|
|
|
|
-- Check table
|
|
CREATE TABLE function_prop_table(a int, b int);
|
|
-- Non-distributed table is not distributed as dependency
|
|
CREATE OR REPLACE FUNCTION func_4(param_1 function_prop_table)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
WARNING: "function func_4(function_prop_table)" has dependency to "table function_prop_table" that is not in Citus' metadata
|
|
DETAIL: "function func_4(function_prop_table)" will be created only locally
|
|
HINT: Distribute "table function_prop_table" first to distribute "function func_4(function_prop_table)"
|
|
CREATE OR REPLACE FUNCTION func_5(param_1 int)
|
|
RETURNS function_prop_table
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
WARNING: "function func_5(integer)" has dependency to "table function_prop_table" that is not in Citus' metadata
|
|
DETAIL: "function func_5(integer)" will be created only locally
|
|
HINT: Distribute "table function_prop_table" first to distribute "function func_5(integer)"
|
|
-- Functions can be created with distributed table dependency
|
|
SELECT create_distributed_table('function_prop_table', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_6(param_1 function_prop_table)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_6'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_6}",{function_propagation_schema.function_prop_table})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_6'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_6}",{function_propagation_schema.function_prop_table})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_6}",{function_propagation_schema.function_prop_table})
|
|
(2 rows)
|
|
|
|
-- Views are not supported
|
|
CREATE VIEW function_prop_view AS SELECT * FROM function_prop_table;
|
|
CREATE OR REPLACE FUNCTION func_7(param_1 function_prop_view)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
WARNING: "function func_7(function_prop_view)" has dependency on unsupported object "view function_prop_view"
|
|
DETAIL: "function func_7(function_prop_view)" will be created only locally
|
|
CREATE OR REPLACE FUNCTION func_8(param_1 int)
|
|
RETURNS function_prop_view
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
WARNING: "function func_8(integer)" has dependency on unsupported object "view function_prop_view"
|
|
DETAIL: "function func_8(integer)" will be created only locally
|
|
-- Check within transaction
|
|
BEGIN;
|
|
CREATE TYPE type_in_transaction AS (a int, b int);
|
|
CREATE OR REPLACE FUNCTION func_in_transaction(param_1 type_in_transaction)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(type,{function_propagation_schema.type_in_transaction},{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction}",{function_propagation_schema.type_in_transaction})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Show that recreating it outside transaction distributes the function and dependencies
|
|
CREATE OR REPLACE FUNCTION func_in_transaction(param_1 type_in_transaction)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(type,{function_propagation_schema.type_in_transaction},{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction}",{function_propagation_schema.type_in_transaction})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (type,{function_propagation_schema.type_in_transaction},{})
|
|
localhost | 57638 | t | (type,{function_propagation_schema.type_in_transaction},{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction}",{function_propagation_schema.type_in_transaction})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction}",{function_propagation_schema.type_in_transaction})
|
|
(2 rows)
|
|
|
|
-- Test for SQL function with unsupported object in function body
|
|
CREATE TABLE table_in_sql_body(id int);
|
|
CREATE FUNCTION max_of_table()
|
|
RETURNS int
|
|
LANGUAGE SQL AS
|
|
$$
|
|
SELECT max(id) FROM table_in_sql_body
|
|
$$;
|
|
-- Show that only function has propagated, since the table is not resolved as dependency
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regclass::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,max_of_table}",{})
|
|
(1 row)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,max_of_table}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,max_of_table}",{})
|
|
(2 rows)
|
|
|
|
-- Check extension owned table
|
|
CREATE TABLE extension_owned_table(a int);
|
|
SELECT run_command_on_workers($$
|
|
CREATE TABLE function_propagation_schema.extension_owned_table(a int);
|
|
$$
|
|
);
|
|
run_command_on_workers
|
|
---------------------------------------------------------------------
|
|
(localhost,57637,t,"CREATE TABLE")
|
|
(localhost,57638,t,"CREATE TABLE")
|
|
(2 rows)
|
|
|
|
CREATE EXTENSION seg;
|
|
ALTER EXTENSION seg ADD TABLE extension_owned_table;
|
|
NOTICE: Citus does not propagate adding/dropping member objects
|
|
HINT: You can add/drop the member objects on the workers as well.
|
|
SELECT run_command_on_workers($$
|
|
ALTER EXTENSION seg ADD TABLE function_propagation_schema.extension_owned_table;
|
|
$$);
|
|
run_command_on_workers
|
|
---------------------------------------------------------------------
|
|
(localhost,57637,t,"ALTER EXTENSION")
|
|
(localhost,57638,t,"ALTER EXTENSION")
|
|
(2 rows)
|
|
|
|
CREATE OR REPLACE FUNCTION func_for_ext_check(param_1 extension_owned_table)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Show that functions are propagated (or not) as a dependency
|
|
-- Function as a default column
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_def()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Function shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_prop_func(id int, col_1 int default func_in_transaction_def());
|
|
SELECT create_distributed_table('table_to_prop_func','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_def}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_def}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_def}",{})
|
|
(2 rows)
|
|
|
|
-- Multiple functions as a default column
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_1()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_2()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_prop_func_2(id int, col_1 int default func_in_transaction_1() + func_in_transaction_2());
|
|
SELECT create_distributed_table('table_to_prop_func_2','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Functions should be marked as distribued after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_1}",{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_2}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Functions should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_1}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_1}",{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_2}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_2}",{})
|
|
(2 rows)
|
|
|
|
-- If function has dependency on non-distributed table it should error out
|
|
BEGIN;
|
|
CREATE TABLE non_dist_table(id int);
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_3(param_1 non_dist_table)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_to_prop_func_3(id int, col_1 int default func_in_transaction_3(NULL::non_dist_table));
|
|
-- It should error out as there is a non-distributed table dependency
|
|
SELECT create_distributed_table('table_to_prop_func_3','id');
|
|
ERROR: "table table_to_prop_func_3" has dependency to "table non_dist_table" that is not in Citus' metadata
|
|
HINT: Distribute "table non_dist_table" first to distribute "table table_to_prop_func_3"
|
|
COMMIT;
|
|
-- Adding a column with default value should propagate the function
|
|
BEGIN;
|
|
CREATE TABLE table_to_prop_func_4(id int);
|
|
SELECT create_distributed_table('table_to_prop_func_4', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_4()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Function shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
ALTER TABLE table_to_prop_func_4 ADD COLUMN col_1 int default function_propagation_schema.func_in_transaction_4();
|
|
-- Function should be marked as distributed after adding the column
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_4}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Functions should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_4}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_4}",{})
|
|
(2 rows)
|
|
|
|
-- Adding a column with default function depending on non-distributable table should fail
|
|
BEGIN;
|
|
CREATE TABLE non_dist_table_for_function(id int);
|
|
CREATE OR REPLACE FUNCTION non_dist_func(col_1 non_dist_table_for_function)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_to_dist(id int);
|
|
SELECT create_distributed_table('table_to_dist', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
ALTER TABLE table_to_dist ADD COLUMN col_1 int default function_propagation_schema.non_dist_func(NULL::non_dist_table_for_function);
|
|
ERROR: "table table_to_dist" has dependency to "table non_dist_table_for_function" that is not in Citus' metadata
|
|
HINT: Distribute "table non_dist_table_for_function" first to distribute "table table_to_dist"
|
|
ROLLBACK;
|
|
-- Adding multiple columns with default values should propagate the function
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_5()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_6()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_prop_func_5(id int, col_1 int default func_in_transaction_5(), col_2 int default func_in_transaction_6());
|
|
SELECT create_distributed_table('table_to_prop_func_5', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Functions should be marked as distributed after adding the column
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_5}",{})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_6}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Functions should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_5}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_5}",{})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_6}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_6}",{})
|
|
(2 rows)
|
|
|
|
-- Adding a constraint with function check should propagate the function
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_7(param_1 int)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_prop_func_6(id int, col_1 int check (function_propagation_schema.func_in_transaction_7(col_1)));
|
|
SELECT create_distributed_table('table_to_prop_func_6', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after adding the column
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_7}",{integer})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_7}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_7}",{integer})
|
|
(2 rows)
|
|
|
|
-- Adding a constraint with multiple functions check should propagate the function
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_8(param_1 int)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_9(param_1 int)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_prop_func_7(id int, col_1 int check (function_propagation_schema.func_in_transaction_8(col_1) and function_propagation_schema.func_in_transaction_9(col_1)));
|
|
SELECT create_distributed_table('table_to_prop_func_7', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after adding the column
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_8}",{integer})
|
|
(1 row)
|
|
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_9}",{integer})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Functions should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_8}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_8}",{integer})
|
|
(2 rows)
|
|
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_9}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_9}",{integer})
|
|
(2 rows)
|
|
|
|
-- Adding a column with constraint should propagate the function
|
|
BEGIN;
|
|
CREATE TABLE table_to_prop_func_8(id int, col_1 int);
|
|
SELECT create_distributed_table('table_to_prop_func_8', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_10(param_1 int)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
ALTER TABLE table_to_prop_func_8 ADD CONSTRAINT col1_check CHECK (function_propagation_schema.func_in_transaction_10(col_1));
|
|
-- Function should be marked as distributed after adding the constraint
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_10}",{integer})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer})
|
|
(2 rows)
|
|
|
|
-- If constraint depends on a non-distributed table it should error out
|
|
BEGIN;
|
|
CREATE TABLE local_table_for_const(id int);
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_11(param_1 int, param_2 local_table_for_const)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_to_prop_func_9(id int, col_1 int check (func_in_transaction_11(col_1, NULL::local_table_for_const)));
|
|
-- It should error out since there is non-distributed table dependency exists
|
|
SELECT create_distributed_table('table_to_prop_func_9', 'id');
|
|
ERROR: "table table_to_prop_func_9" has dependency to "table local_table_for_const" that is not in Citus' metadata
|
|
HINT: Distribute "table local_table_for_const" first to distribute "table table_to_prop_func_9"
|
|
COMMIT;
|
|
-- Show that function as a part of generated always is supporte
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION non_sense_func_for_generated_always()
|
|
RETURNS int
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE people (
|
|
id int,
|
|
height_cm numeric,
|
|
height_in numeric GENERATED ALWAYS AS (height_cm / non_sense_func_for_generated_always()) STORED);
|
|
SELECT create_distributed_table('people', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Show that function is distributed after distributing the table
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,non_sense_func_for_generated_always}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Show that functions depending table via rule are also distributed
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_for_rule()
|
|
RETURNS int
|
|
LANGUAGE plpgsql STABLE AS
|
|
$$
|
|
BEGIN
|
|
return 4;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_1_for_rule(id int, col_1 int);
|
|
CREATE TABLE table_2_for_rule(id int, col_1 int);
|
|
CREATE RULE rule_1 AS ON UPDATE TO table_1_for_rule DO ALSO UPDATE table_2_for_rule SET col_1 = col_1 * func_for_rule();
|
|
SELECT create_distributed_table('table_1_for_rule','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Functions should be distributed after distributing the table
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_for_rule}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_for_rule}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_for_rule}",{})
|
|
(2 rows)
|
|
|
|
-- Show that functions as partitioning functions are supported
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION non_sense_func_for_partitioning(int)
|
|
RETURNS int
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE partitioned_table_to_test_func_prop(id INT, a INT) PARTITION BY RANGE (non_sense_func_for_partitioning(id));
|
|
SELECT create_distributed_table('partitioned_table_to_test_func_prop', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Show that function is distributed after distributing the table
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer})
|
|
(2 rows)
|
|
|
|
-- Test function dependency on citus local table
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_for_local_table()
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- Function shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE citus_local_table_to_test_func(l1 int DEFAULT func_in_transaction_for_local_table());
|
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
|
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT citus_add_local_table_to_metadata('citus_local_table_to_test_func');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_for_local_table}",{})
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- Show that having a function dependency on exlude also works
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION exclude_bool_func()
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return true;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE exclusion_func_prop_table (id int, EXCLUDE USING btree (id WITH =) WHERE (exclude_bool_func()));
|
|
SELECT create_distributed_table('exclusion_func_prop_table', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,exclude_bool_func}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,exclude_bool_func}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,exclude_bool_func}",{})
|
|
(2 rows)
|
|
|
|
-- Show that having a function dependency for index also works
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_for_index_predicate(col_1 int)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return col_1 > 5;
|
|
END;
|
|
$$;
|
|
-- Functions shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE table_to_check_func_index_dep (id int, col_2 int);
|
|
CREATE INDEX on table_to_check_func_index_dep(col_2) WHERE (func_for_index_predicate(col_2));
|
|
SELECT create_distributed_table('table_to_check_func_index_dep', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_for_index_predicate}",{integer})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_for_index_predicate}",{integer})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_for_index_predicate}",{integer})
|
|
(2 rows)
|
|
|
|
-- Test function to function dependency
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_for_func_dep_1()
|
|
RETURNS int
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return 5;
|
|
END;
|
|
$$;
|
|
CREATE TABLE func_dep_table(a int, b int default func_for_func_dep_1());
|
|
CREATE OR REPLACE FUNCTION func_for_func_dep_2(col_1 func_dep_table)
|
|
RETURNS int
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return 5;
|
|
END;
|
|
$$;
|
|
SELECT create_distributed_table('func_dep_table', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_for_func_dep_1}",{})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_for_func_dep_1}",{})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_for_func_dep_1}",{})
|
|
(2 rows)
|
|
|
|
-- Test function with SQL language and sequence dependency
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_def_with_seq(val bigint)
|
|
RETURNS bigint
|
|
LANGUAGE SQL AS
|
|
$$
|
|
SELECT 2 * val;
|
|
$$;
|
|
CREATE OR REPLACE FUNCTION func_in_transaction_def_with_func(val bigint)
|
|
RETURNS bigint
|
|
LANGUAGE SQL AS
|
|
$$
|
|
SELECT func_in_transaction_def_with_seq(val);
|
|
$$;
|
|
-- Function shouldn't be propagated within transaction
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE SEQUENCE myseq;
|
|
CREATE TABLE table_to_prop_seq_func(id int, col_1 bigint default func_in_transaction_def_with_func(func_in_transaction_def_with_seq(nextval('myseq'))));
|
|
SELECT create_distributed_table('table_to_prop_seq_func','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Function should be marked as distributed after distributing the table that depends on it
|
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid;
|
|
pg_identify_object_as_address
|
|
---------------------------------------------------------------------
|
|
(function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint})
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Function should be marked as distributed on the worker after committing changes
|
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from pg_catalog.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid;$$) ORDER BY 1,2;
|
|
nodename | nodeport | success | result
|
|
---------------------------------------------------------------------
|
|
localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint})
|
|
localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint})
|
|
(2 rows)
|
|
|
|
-- Show that having a dependency on another dist table work out tx
|
|
CREATE TABLE loc_for_func_dist (
|
|
product_no integer,
|
|
name text,
|
|
price numeric CONSTRAINT positive_price CHECK (price > 0));
|
|
SELECT create_distributed_table('loc_for_func_dist', 'product_no');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION non_sense_func_for_default_val(loc_for_func_dist)
|
|
RETURNS int
|
|
LANGUAGE plpgsql IMMUTABLE AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_non_for_func_dist (
|
|
a int,
|
|
b int DEFAULT non_sense_func_for_default_val(NULL::loc_for_func_dist));
|
|
SELECT create_distributed_table('table_non_for_func_dist', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET citus.shard_replication_factor = 1;
|
|
-- test creating a colocated function
|
|
CREATE TABLE tbl_to_colocate (a int);
|
|
SELECT create_distributed_table('tbl_to_colocate', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- first test colocating function with a ref table
|
|
CREATE TABLE tbl_to_colocate_ref (a int);
|
|
SELECT create_reference_table('tbl_to_colocate_ref');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE FUNCTION func_to_colocate (a int) returns int as $$select 1;$$ language sql;
|
|
-- see the empty pg_dist_object entries
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
| |
|
|
(1 row)
|
|
|
|
-- colocate the function with ref table
|
|
SELECT create_distributed_function('func_to_colocate(int)', colocate_with:='tbl_to_colocate_ref');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- see the pg_dist_object entry
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
| 10003 |
|
|
(1 row)
|
|
|
|
-- convert to non-delegated
|
|
SELECT create_distributed_function('func_to_colocate(int)');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- show that the pg_dist_object fields are gone
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
| |
|
|
(1 row)
|
|
|
|
-- colocate the function with distributed table
|
|
SELECT create_distributed_function('func_to_colocate(int)','$1','tbl_to_colocate');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- see the pg_dist_object entry
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
0 | 10005 |
|
|
(1 row)
|
|
|
|
-- try create or replace the same func
|
|
CREATE OR REPLACE FUNCTION func_to_colocate (a int) returns int as $$select 1;$$ language sql;
|
|
-- verify the pg_dist_object entry is the same
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
0 | 10005 |
|
|
(1 row)
|
|
|
|
-- convert to non-delegated
|
|
SELECT create_distributed_function('func_to_colocate(int)');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- show that the pg_dist_object fields are gone
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
| |
|
|
(1 row)
|
|
|
|
-- force delegate
|
|
SELECT create_distributed_function('func_to_colocate(int)','$1','tbl_to_colocate', true);
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- show pg_dist_object fields
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
0 | 10005 | t
|
|
(1 row)
|
|
|
|
-- convert to non-delegated
|
|
SELECT create_distributed_function('func_to_colocate(int)');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- show that the pg_dist_object fields are gone
|
|
SELECT distribution_argument_index, colocationid, force_delegation FROM pg_catalog.pg_dist_object WHERE objid = 'func_to_colocate'::regproc;
|
|
distribution_argument_index | colocationid | force_delegation
|
|
---------------------------------------------------------------------
|
|
| |
|
|
(1 row)
|
|
|
|
-- Show that causing circular dependency via functions and default values are not allowed
|
|
CREATE TABLE table_1_for_circ_dep(id int);
|
|
select create_distributed_table('table_1_for_circ_dep','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_1_for_circ_dep(col_1 table_1_for_circ_dep)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_2_for_circ_dep(id int, col_2 int default func_1_for_circ_dep(NULL::table_1_for_circ_dep));
|
|
select create_distributed_table('table_2_for_circ_dep','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_2_for_circ_dep(col_3 table_2_for_circ_dep)
|
|
RETURNS int
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return 1;
|
|
END;
|
|
$$;
|
|
-- It should error out due to circular dependency
|
|
ALTER TABLE table_1_for_circ_dep ADD COLUMN col_2 int default func_2_for_circ_dep(NULL::table_2_for_circ_dep);
|
|
ERROR: Citus can not handle circular dependencies between distributed objects
|
|
DETAIL: "table table_1_for_circ_dep" circularly depends itself, resolve circular dependency first
|
|
-- Show that causing circular dependency via functions and constraints are not allowed
|
|
CREATE TABLE table_1_for_circ_dep_2(id int, col_1 int);
|
|
select create_distributed_table('table_1_for_circ_dep_2','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_1_for_circ_dep_2(param_1 int, param_2 table_1_for_circ_dep_2)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
CREATE TABLE table_2_for_circ_dep_2(id int, col_1 int check (func_1_for_circ_dep_2(col_1, NULL::table_1_for_circ_dep_2)));
|
|
select create_distributed_table('table_2_for_circ_dep_2','id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION func_2_for_circ_dep_2(param_1 int, param_2 table_2_for_circ_dep_2)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
-- It should error out due to circular dependency
|
|
ALTER TABLE table_1_for_circ_dep_2 ADD CONSTRAINT check_const_for_circ check (func_2_for_circ_dep_2(col_1, NULL::table_2_for_circ_dep_2));
|
|
ERROR: Citus can not handle circular dependencies between distributed objects
|
|
DETAIL: "table table_1_for_circ_dep_2" circularly depends itself, resolve circular dependency first
|
|
-- Show that causing circular dependency via functions and create_distributed_table are not allowed
|
|
CREATE TABLE table_1_for_circ_dep_3(id int, col_1 int);
|
|
CREATE OR REPLACE FUNCTION func_for_circ_dep_3(param_1 int, param_2 table_1_for_circ_dep_3)
|
|
RETURNS boolean
|
|
LANGUAGE plpgsql AS
|
|
$$
|
|
BEGIN
|
|
return param_1 > 5;
|
|
END;
|
|
$$;
|
|
WARNING: "function func_for_circ_dep_3(integer,table_1_for_circ_dep_3)" has dependency to "table table_1_for_circ_dep_3" that is not in Citus' metadata
|
|
DETAIL: "function func_for_circ_dep_3(integer,table_1_for_circ_dep_3)" will be created only locally
|
|
HINT: Distribute "table table_1_for_circ_dep_3" first to distribute "function func_for_circ_dep_3(integer,table_1_for_circ_dep_3)"
|
|
CREATE TABLE table_2_for_circ_dep_3(id int, col_1 int check (func_for_circ_dep_3(col_1, NULL::table_1_for_circ_dep_3)));
|
|
ALTER TABLE table_1_for_circ_dep_3 ADD COLUMN col_2 table_2_for_circ_dep_3;
|
|
-- It should error out due to circular dependency
|
|
SELECT create_distributed_table('table_1_for_circ_dep_3','id');
|
|
ERROR: Citus can not handle circular dependencies between distributed objects
|
|
DETAIL: "table table_1_for_circ_dep_3" circularly depends itself, resolve circular dependency first
|
|
-- will skip trying to propagate the function due to temp schema
|
|
CREATE FUNCTION pg_temp.temp_func(group_size BIGINT) RETURNS SETOF integer[]
|
|
AS $$
|
|
SELECT array_agg(s) OVER w
|
|
FROM generate_series(1,5) s
|
|
WINDOW w AS (ORDER BY s ROWS BETWEEN CURRENT ROW AND GROUP_SIZE FOLLOWING)
|
|
$$ LANGUAGE SQL STABLE;
|
|
WARNING: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
|
|
DETAIL: "function pg_temp_xxx.temp_func(bigint)" will be created only locally
|
|
SELECT create_distributed_function('pg_temp.temp_func(BIGINT)');
|
|
ERROR: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
|
|
-- Show that support functions are supported
|
|
CREATE FUNCTION func_with_support(int, int) RETURNS bool
|
|
LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE
|
|
AS $$int4eq$$ SUPPORT generate_series_int8_support;
|
|
CREATE FUNCTION func_with_support_2(int, int) RETURNS bool
|
|
LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE
|
|
AS $$int4eq$$;
|
|
ALTER FUNCTION func_with_support_2(int, int) SUPPORT generate_series_int8_support;
|
|
RESET search_path;
|
|
SET client_min_messages TO WARNING;
|
|
DROP SCHEMA function_propagation_schema CASCADE;
|