From 12db1102c7a15a65e2b1fc3de32c7bbe5cf304d4 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Thu, 24 Feb 2022 11:27:03 +0300 Subject: [PATCH] Expand functions while resolving dependencies --- src/backend/distributed/metadata/dependency.c | 24 +- .../distributed/metadata/metadata_sync.c | 116 +++ src/include/distributed/metadata_sync.h | 1 + .../regress/expected/function_propagation.out | 704 ++++++++++++++++++ src/test/regress/sql/function_propagation.sql | 477 ++++++++++++ 5 files changed, 1321 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index bb25e2b6b..4b3595575 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -122,6 +122,7 @@ typedef struct ViewDependencyNode static List * GetRelationSequenceDependencyList(Oid relationId); +static List * GetRelationFunctionDependencyList(Oid relationId); static List * GetRelationTriggerFunctionDependencyList(Oid relationId); static List * GetRelationStatsSchemaDependencyList(Oid relationId); static List * GetRelationIndicesDependencyList(Oid relationId); @@ -1091,9 +1092,15 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe * with them. */ List *sequenceDependencyList = GetRelationSequenceDependencyList(relationId); - result = list_concat(result, sequenceDependencyList); + /* + * Get the dependent functions for tables as columns has default values + * and contraints, then expand dependency list with them. + */ + List *functionDependencyList = GetRelationFunctionDependencyList(relationId); + result = list_concat(result, functionDependencyList); + /* * Tables could have indexes. Indexes themself could have dependencies that * need to be propagated. eg. TEXT SEARCH CONFIGRUATIONS. Here we add the @@ -1134,6 +1141,21 @@ GetRelationSequenceDependencyList(Oid relationId) } +/* + * GetRelationFunctionDependencyList returns the function dependency definition + * list for the given relation. + */ +static List * +GetRelationFunctionDependencyList(Oid relationId) +{ + List *dependentFunctionOids = GetDependentFunctionsWithRelation(relationId); + List *functionDependencyDefList = + CreateObjectAddressDependencyDefList(ProcedureRelationId, dependentFunctionOids); + + return functionDependencyDefList; +} + + /* * GetRelationStatsSchemaDependencyList returns a list of DependencyDefinition * objects for the schemas that statistics' of the relation with relationId depends. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 490eeb4ca..651c70d35 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -28,9 +28,11 @@ #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_attrdef.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_depend.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/async.h" #include "distributed/argutils.h" @@ -85,6 +87,7 @@ char *EnableManualMetadataChangesForUser = ""; static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); +static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); static void CreateShellTableOnWorkers(Oid relationId); @@ -1544,6 +1547,119 @@ GetSequencesFromAttrDef(Oid attrdefOid) } +/* + * GetDependentFunctionsWithRelation returns the dependent functions for the + * given relation id. + */ +List * +GetDependentFunctionsWithRelation(Oid relationId) +{ + List *referencingObjects = NIL; + List *functionOids = NIL; + ScanKeyData key[2]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relationId)); + + SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 2, key); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + /* + * objsubid is nonzero only for table columns and zero for anything else. + * Since we are trying to find a dependency from the column of a table to + * function we've added deprec->refobjsubid != 0 check. + * + * We are following DEPENDENCY_AUTO for dependencies via column and + * DEPENDENCY_NORMAL anything else. Since only procedure dependencies + * for those dependencies will be obtained in GetFunctionDependenciesForObjects + * following both dependency types are not harmful. + */ + if ((deprec->refobjsubid != 0 && deprec->deptype == DEPENDENCY_AUTO) || + deprec->deptype == DEPENDENCY_NORMAL) + { + ObjectAddress *refAddress = palloc(sizeof(ObjectAddress)); + ObjectAddressSubSet(*refAddress, deprec->classid, + deprec->objid, + deprec->objsubid); + referencingObjects = lappend(referencingObjects, refAddress); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + ObjectAddress *referencingObject = NULL; + foreach_ptr(referencingObject, referencingObjects) + { + functionOids = list_concat(functionOids, + GetFunctionDependenciesForObjects(referencingObject)); + } + + return functionOids; +} + + +/* + * GetFunctionDependenciesForObjects returns a list of function OIDs that have + * dependency with the given object + */ +static List * +GetFunctionDependenciesForObjects(ObjectAddress *objectAddress) +{ + List *functionOids = NIL; + ScanKeyData key[3]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(objectAddress->classId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(objectAddress->objectId)); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(objectAddress->objectSubId)); + + SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, + NULL, 3, key); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if (deprec->refclassid == ProcedureRelationId) + { + functionOids = lappend_oid(functionOids, deprec->refobjid); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + return functionOids; +} + + /* * SequenceDependencyCommandList generates commands to record the dependency * of sequences on tables on the worker. This dependency does not exist by diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 69d500da4..31154a203 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -73,6 +73,7 @@ extern List * GetSequencesFromAttrDef(Oid attrdefOid); extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList, List **dependentSequenceList, AttrNumber attnum); +extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" diff --git a/src/test/regress/expected/function_propagation.out b/src/test/regress/expected/function_propagation.out index a40f1f9a2..ba7aebd5a 100644 --- a/src/test/regress/expected/function_propagation.out +++ b/src/test/regress/expected/function_propagation.out @@ -316,6 +316,710 @@ BEGIN return 1; END; $$; +-- Show that functions are propagated (or not) as a dependency +-- Function as a default column +BEGIN; +CREATE OR REPLACE FUNCTION func_in_transaction_def() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +-- Function shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_to_prop_func(id int, col_1 int default func_in_transaction_def()); +SELECT create_distributed_table('table_to_prop_func','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Function should be marked as distributed after distributing the table that depends on it +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_def}",{}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_def}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_def}",{}) +(2 rows) + +-- Multiple functions as a default column +BEGIN; +CREATE OR REPLACE FUNCTION func_in_transaction_1() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +CREATE OR REPLACE FUNCTION func_in_transaction_2() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +-- Functions shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_to_prop_func_2(id int, col_1 int default func_in_transaction_1() + func_in_transaction_2()); +SELECT create_distributed_table('table_to_prop_func_2','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Functions should be marked as distribued after distributing the table that depends on it +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_1}",{}) +(1 row) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_2}",{}) +(1 row) + +COMMIT; +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_1}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_1}",{}) +(2 rows) + +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_2}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_2}",{}) +(2 rows) + +-- If function has dependency on non-distributed table it should error out +BEGIN; +CREATE TABLE non_dist_table(id int); +CREATE OR REPLACE FUNCTION func_in_transaction_3(param_1 non_dist_table) +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +CREATE TABLE table_to_prop_func_3(id int, col_1 int default func_in_transaction_3(NULL::non_dist_table)); +-- It should error out as there is a non-distributed table dependency +SELECT create_distributed_table('table_to_prop_func_3','id'); +ERROR: type function_propagation_schema.non_dist_table does not exist +CONTEXT: while executing command on localhost:xxxxx +COMMIT; +-- Adding a column with default value should propagate the function +BEGIN; +CREATE TABLE table_to_prop_func_4(id int); +SELECT create_distributed_table('table_to_prop_func_4', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_in_transaction_4() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +-- Function shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +ALTER TABLE table_to_prop_func_4 ADD COLUMN col_1 int default function_propagation_schema.func_in_transaction_4(); +-- Function should be marked as distributed after adding the column +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_4}",{}) +(1 row) + +COMMIT; +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_4}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_4}",{}) +(2 rows) + +-- Adding multiple columns with default values should propagate the function +BEGIN; +CREATE OR REPLACE FUNCTION func_in_transaction_5() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +CREATE OR REPLACE FUNCTION func_in_transaction_6() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +-- Functions shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_to_prop_func_5(id int, col_1 int default func_in_transaction_5(), col_2 int default func_in_transaction_6()); +SELECT create_distributed_table('table_to_prop_func_5', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Functions should be marked as distributed after adding the column +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_5}",{}) +(1 row) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_6}",{}) +(1 row) + +COMMIT; +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_5}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_5}",{}) +(2 rows) + +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_6}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_6}",{}) +(2 rows) + +-- Adding a constraint with function check should propagate the function +BEGIN; +CREATE OR REPLACE FUNCTION func_in_transaction_7(param_1 int) +RETURNS boolean +LANGUAGE plpgsql AS +$$ +BEGIN + return param_1 > 5; +END; +$$; +-- Functions shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_to_prop_func_6(id int, col_1 int check (function_propagation_schema.func_in_transaction_7(col_1))); +SELECT create_distributed_table('table_to_prop_func_6', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Function should be marked as distributed after adding the column +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_7}",{integer}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_7}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_7}",{integer}) +(2 rows) + +-- Adding a constraint with multiple functions check should propagate the function +BEGIN; +CREATE OR REPLACE FUNCTION func_in_transaction_8(param_1 int) +RETURNS boolean +LANGUAGE plpgsql AS +$$ +BEGIN + return param_1 > 5; +END; +$$; +CREATE OR REPLACE FUNCTION func_in_transaction_9(param_1 int) +RETURNS boolean +LANGUAGE plpgsql AS +$$ +BEGIN + return param_1 > 5; +END; +$$; +-- Functions shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_to_prop_func_7(id int, col_1 int check (function_propagation_schema.func_in_transaction_8(col_1) and function_propagation_schema.func_in_transaction_9(col_1))); +SELECT create_distributed_table('table_to_prop_func_7', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Function should be marked as distributed after adding the column +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_8}",{integer}) +(1 row) + +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_9}",{integer}) +(1 row) + +COMMIT; +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_8}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_8}",{integer}) +(2 rows) + +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_9}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_9}",{integer}) +(2 rows) + +-- Adding a column with constraint should propagate the function +BEGIN; +CREATE TABLE table_to_prop_func_8(id int, col_1 int); +SELECT create_distributed_table('table_to_prop_func_8', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_in_transaction_10(param_1 int) +RETURNS boolean +LANGUAGE plpgsql AS +$$ +BEGIN + return param_1 > 5; +END; +$$; +-- Functions shouldn't be propagated within transaction +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +ALTER TABLE table_to_prop_func_8 ADD CONSTRAINT col1_check CHECK (function_propagation_schema.func_in_transaction_10(col_1)); +-- Function should be marked as distributed after adding the constraint +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_10}",{integer}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_10}",{integer}) +(2 rows) + +-- If constraint depends on a non-distributed table it should error out +BEGIN; +CREATE TABLE local_table_for_const(id int); +CREATE OR REPLACE FUNCTION func_in_transaction_11(param_1 int, param_2 local_table_for_const) +RETURNS boolean +LANGUAGE plpgsql AS +$$ +BEGIN + return param_1 > 5; +END; +$$; +CREATE TABLE table_to_prop_func_9(id int, col_1 int check (func_in_transaction_11(col_1, NULL::local_table_for_const))); +-- It should error out since there is non-distributed table dependency exists +SELECT create_distributed_table('table_to_prop_func_9', 'id'); +ERROR: type function_propagation_schema.local_table_for_const does not exist +CONTEXT: while executing command on localhost:xxxxx +COMMIT; +-- Show that function as a part of generated always is supporte +BEGIN; + CREATE OR REPLACE FUNCTION non_sense_func_for_generated_always() + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 1; + END; + $$; + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE TABLE people ( + id int, + height_cm numeric, + height_in numeric GENERATED ALWAYS AS (height_cm / non_sense_func_for_generated_always()) STORED); + SELECT create_distributed_table('people', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Show that function is distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,non_sense_func_for_generated_always}",{}) +(1 row) + +COMMIT; +-- Show that functions depending table via rule are also distributed +BEGIN; +CREATE OR REPLACE FUNCTION func_for_rule() +RETURNS int +LANGUAGE plpgsql STABLE AS +$$ +BEGIN + return 4; +END; +$$; + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE table_1_for_rule(id int, col_1 int); +CREATE TABLE table_2_for_rule(id int, col_1 int); +CREATE RULE rule_1 AS ON UPDATE TO table_1_for_rule DO ALSO UPDATE table_2_for_rule SET col_1 = col_1 * func_for_rule(); +SELECT create_distributed_table('table_1_for_rule','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Functions should be distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_for_rule}",{}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_for_rule}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_for_rule}",{}) +(2 rows) + +-- Show that functions as partitioning functions are supported +BEGIN; + CREATE OR REPLACE FUNCTION non_sense_func_for_partitioning(int) + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 1; + END; + $$; + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE TABLE partitioned_table_to_test_func_prop(id INT, a INT) PARTITION BY RANGE (non_sense_func_for_partitioning(id)); + SELECT create_distributed_table('partitioned_table_to_test_func_prop', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Show that function is distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,non_sense_func_for_partitioning}",{integer}) +(2 rows) + +-- Test function dependency on citus local table +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_for_local_table() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE TABLE citus_local_table_to_test_func(l1 int DEFAULT func_in_transaction_for_local_table()); + SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT citus_add_local_table_to_metadata('citus_local_table_to_test_func'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_for_local_table}",{}) +(1 row) + +ROLLBACK; +-- Show that having a function dependency on exlude also works +BEGIN; + CREATE OR REPLACE FUNCTION exclude_bool_func() + RETURNS boolean + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return true; + END; + $$; + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE TABLE exclusion_func_prop_table (id int, EXCLUDE USING btree (id WITH =) WHERE (exclude_bool_func())); + SELECT create_distributed_table('exclusion_func_prop_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,exclude_bool_func}",{}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,exclude_bool_func}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,exclude_bool_func}",{}) +(2 rows) + +-- Show that having a function dependency for index also works +BEGIN; + CREATE OR REPLACE FUNCTION func_for_index_predicate(col_1 int) + RETURNS boolean + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return col_1 > 5; + END; + $$; + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE TABLE table_to_check_func_index_dep (id int, col_2 int); + CREATE INDEX on table_to_check_func_index_dep(col_2) WHERE (func_for_index_predicate(col_2)); + SELECT create_distributed_table('table_to_check_func_index_dep', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_for_index_predicate}",{integer}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_for_index_predicate}",{integer}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_for_index_predicate}",{integer}) +(2 rows) + +-- Test function to function dependency +BEGIN; + CREATE OR REPLACE FUNCTION func_for_func_dep_1() + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 5; + END; + $$; + CREATE TABLE func_dep_table(a int, b int default func_for_func_dep_1()); + CREATE OR REPLACE FUNCTION func_for_func_dep_2(col_1 func_dep_table) + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 5; + END; + $$; + SELECT create_distributed_table('func_dep_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_for_func_dep_1}",{}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_for_func_dep_1}",{}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_for_func_dep_1}",{}) +(2 rows) + +-- Test function with SQL language and sequence dependency +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_def_with_seq(val bigint) + RETURNS bigint + LANGUAGE SQL AS + $$ + SELECT 2 * val; + $$; + CREATE OR REPLACE FUNCTION func_in_transaction_def_with_func(val bigint) + RETURNS bigint + LANGUAGE SQL AS + $$ + SELECT func_in_transaction_def_with_seq(val); + $$; + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- +(0 rows) + + CREATE SEQUENCE myseq; + CREATE TABLE table_to_prop_seq_func(id int, col_1 bigint default func_in_transaction_def_with_func(func_in_transaction_def_with_seq(nextval('myseq')))); + SELECT create_distributed_table('table_to_prop_seq_func','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint}) +(1 row) + +COMMIT; +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid;$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | t | (function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint}) + localhost | 57638 | t | (function,"{function_propagation_schema,func_in_transaction_def_with_seq}",{bigint}) +(2 rows) + RESET search_path; SET client_min_messages TO WARNING; DROP SCHEMA function_propagation_schema CASCADE; diff --git a/src/test/regress/sql/function_propagation.sql b/src/test/regress/sql/function_propagation.sql index 79168497b..a9a6d04d8 100644 --- a/src/test/regress/sql/function_propagation.sql +++ b/src/test/regress/sql/function_propagation.sql @@ -188,6 +188,483 @@ BEGIN END; $$; +-- Show that functions are propagated (or not) as a dependency + +-- Function as a default column +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_def() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid; + + CREATE TABLE table_to_prop_func(id int, col_1 int default func_in_transaction_def()); + SELECT create_distributed_table('table_to_prop_func','id'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def'::regproc::oid;$$) ORDER BY 1,2; + + +-- Multiple functions as a default column +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_1() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + CREATE OR REPLACE FUNCTION func_in_transaction_2() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid; + + CREATE TABLE table_to_prop_func_2(id int, col_1 int default func_in_transaction_1() + func_in_transaction_2()); + SELECT create_distributed_table('table_to_prop_func_2','id'); + + -- Functions should be marked as distribued after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid; +COMMIT; + +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_1'::regproc::oid;$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_2'::regproc::oid;$$) ORDER BY 1,2; + + +-- If function has dependency on non-distributed table it should error out +BEGIN; + CREATE TABLE non_dist_table(id int); + + CREATE OR REPLACE FUNCTION func_in_transaction_3(param_1 non_dist_table) + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + CREATE TABLE table_to_prop_func_3(id int, col_1 int default func_in_transaction_3(NULL::non_dist_table)); + + -- It should error out as there is a non-distributed table dependency + SELECT create_distributed_table('table_to_prop_func_3','id'); +COMMIT; + + +-- Adding a column with default value should propagate the function +BEGIN; + CREATE TABLE table_to_prop_func_4(id int); + SELECT create_distributed_table('table_to_prop_func_4', 'id'); + + CREATE OR REPLACE FUNCTION func_in_transaction_4() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid; + + ALTER TABLE table_to_prop_func_4 ADD COLUMN col_1 int default function_propagation_schema.func_in_transaction_4(); + + -- Function should be marked as distributed after adding the column + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid; +COMMIT; + +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_4'::regproc::oid;$$) ORDER BY 1,2; + + +-- Adding multiple columns with default values should propagate the function +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_5() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + CREATE OR REPLACE FUNCTION func_in_transaction_6() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid; + + CREATE TABLE table_to_prop_func_5(id int, col_1 int default func_in_transaction_5(), col_2 int default func_in_transaction_6()); + SELECT create_distributed_table('table_to_prop_func_5', 'id'); + + -- Functions should be marked as distributed after adding the column + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid; +COMMIT; + +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_5'::regproc::oid;$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_6'::regproc::oid;$$) ORDER BY 1,2; + +-- Adding a constraint with function check should propagate the function +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_7(param_1 int) + RETURNS boolean + LANGUAGE plpgsql AS + $$ + BEGIN + return param_1 > 5; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid; + + CREATE TABLE table_to_prop_func_6(id int, col_1 int check (function_propagation_schema.func_in_transaction_7(col_1))); + SELECT create_distributed_table('table_to_prop_func_6', 'id'); + + -- Function should be marked as distributed after adding the column + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_7'::regproc::oid;$$) ORDER BY 1,2; + + +-- Adding a constraint with multiple functions check should propagate the function +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_8(param_1 int) + RETURNS boolean + LANGUAGE plpgsql AS + $$ + BEGIN + return param_1 > 5; + END; + $$; + + CREATE OR REPLACE FUNCTION func_in_transaction_9(param_1 int) + RETURNS boolean + LANGUAGE plpgsql AS + $$ + BEGIN + return param_1 > 5; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid; + + CREATE TABLE table_to_prop_func_7(id int, col_1 int check (function_propagation_schema.func_in_transaction_8(col_1) and function_propagation_schema.func_in_transaction_9(col_1))); + SELECT create_distributed_table('table_to_prop_func_7', 'id'); + + -- Function should be marked as distributed after adding the column + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid; + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid; +COMMIT; + +-- Functions should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_8'::regproc::oid;$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_9'::regproc::oid;$$) ORDER BY 1,2; + + +-- Adding a column with constraint should propagate the function +BEGIN; + CREATE TABLE table_to_prop_func_8(id int, col_1 int); + SELECT create_distributed_table('table_to_prop_func_8', 'id'); + + CREATE OR REPLACE FUNCTION func_in_transaction_10(param_1 int) + RETURNS boolean + LANGUAGE plpgsql AS + $$ + BEGIN + return param_1 > 5; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid; + + ALTER TABLE table_to_prop_func_8 ADD CONSTRAINT col1_check CHECK (function_propagation_schema.func_in_transaction_10(col_1)); + + -- Function should be marked as distributed after adding the constraint + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_10'::regproc::oid;$$) ORDER BY 1,2; + + +-- If constraint depends on a non-distributed table it should error out +BEGIN; + CREATE TABLE local_table_for_const(id int); + + CREATE OR REPLACE FUNCTION func_in_transaction_11(param_1 int, param_2 local_table_for_const) + RETURNS boolean + LANGUAGE plpgsql AS + $$ + BEGIN + return param_1 > 5; + END; + $$; + + CREATE TABLE table_to_prop_func_9(id int, col_1 int check (func_in_transaction_11(col_1, NULL::local_table_for_const))); + + -- It should error out since there is non-distributed table dependency exists + SELECT create_distributed_table('table_to_prop_func_9', 'id'); +COMMIT; + + +-- Show that function as a part of generated always is supporte +BEGIN; + + CREATE OR REPLACE FUNCTION non_sense_func_for_generated_always() + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 1; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid; + + CREATE TABLE people ( + id int, + height_cm numeric, + height_in numeric GENERATED ALWAYS AS (height_cm / non_sense_func_for_generated_always()) STORED); + + SELECT create_distributed_table('people', 'id'); + + -- Show that function is distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_generated_always'::regproc::oid; +COMMIT; + + +-- Show that functions depending table via rule are also distributed +BEGIN; + CREATE OR REPLACE FUNCTION func_for_rule() + RETURNS int + LANGUAGE plpgsql STABLE AS + $$ + BEGIN + return 4; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid; + + CREATE TABLE table_1_for_rule(id int, col_1 int); + CREATE TABLE table_2_for_rule(id int, col_1 int); + + CREATE RULE rule_1 AS ON UPDATE TO table_1_for_rule DO ALSO UPDATE table_2_for_rule SET col_1 = col_1 * func_for_rule(); + + SELECT create_distributed_table('table_1_for_rule','id'); + + -- Functions should be distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_rule'::regproc::oid;$$) ORDER BY 1,2; + + +-- Show that functions as partitioning functions are supported +BEGIN; + + CREATE OR REPLACE FUNCTION non_sense_func_for_partitioning(int) + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 1; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid; + + CREATE TABLE partitioned_table_to_test_func_prop(id INT, a INT) PARTITION BY RANGE (non_sense_func_for_partitioning(id)); + + SELECT create_distributed_table('partitioned_table_to_test_func_prop', 'id'); + + -- Show that function is distributed after distributing the table + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.non_sense_func_for_partitioning'::regproc::oid;$$) ORDER BY 1,2; + + +-- Test function dependency on citus local table +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_for_local_table() + RETURNS int + LANGUAGE plpgsql AS + $$ + BEGIN + return 1; + END; + $$; + + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid; + + CREATE TABLE citus_local_table_to_test_func(l1 int DEFAULT func_in_transaction_for_local_table()); + SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + SELECT citus_add_local_table_to_metadata('citus_local_table_to_test_func'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_for_local_table'::regproc::oid; +ROLLBACK; + +-- Show that having a function dependency on exlude also works +BEGIN; + CREATE OR REPLACE FUNCTION exclude_bool_func() + RETURNS boolean + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return true; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid; + + CREATE TABLE exclusion_func_prop_table (id int, EXCLUDE USING btree (id WITH =) WHERE (exclude_bool_func())); + SELECT create_distributed_table('exclusion_func_prop_table', 'id'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.exclude_bool_func'::regproc::oid;$$) ORDER BY 1,2; + + +-- Show that having a function dependency for index also works +BEGIN; + CREATE OR REPLACE FUNCTION func_for_index_predicate(col_1 int) + RETURNS boolean + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return col_1 > 5; + END; + $$; + + -- Functions shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid; + + CREATE TABLE table_to_check_func_index_dep (id int, col_2 int); + CREATE INDEX on table_to_check_func_index_dep(col_2) WHERE (func_for_index_predicate(col_2)); + + SELECT create_distributed_table('table_to_check_func_index_dep', 'id'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_index_predicate'::regproc::oid;$$) ORDER BY 1,2; + + +-- Test function to function dependency +BEGIN; + CREATE OR REPLACE FUNCTION func_for_func_dep_1() + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 5; + END; + $$; + + CREATE TABLE func_dep_table(a int, b int default func_for_func_dep_1()); + + CREATE OR REPLACE FUNCTION func_for_func_dep_2(col_1 func_dep_table) + RETURNS int + LANGUAGE plpgsql IMMUTABLE AS + $$ + BEGIN + return 5; + END; + $$; + + SELECT create_distributed_table('func_dep_table', 'a'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_for_func_dep_1'::regproc::oid;$$) ORDER BY 1,2; + + +-- Test function with SQL language and sequence dependency +BEGIN; + CREATE OR REPLACE FUNCTION func_in_transaction_def_with_seq(val bigint) + RETURNS bigint + LANGUAGE SQL AS + $$ + SELECT 2 * val; + $$; + + CREATE OR REPLACE FUNCTION func_in_transaction_def_with_func(val bigint) + RETURNS bigint + LANGUAGE SQL AS + $$ + SELECT func_in_transaction_def_with_seq(val); + $$; + + -- Function shouldn't be propagated within transaction + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid; + + CREATE SEQUENCE myseq; + CREATE TABLE table_to_prop_seq_func(id int, col_1 bigint default func_in_transaction_def_with_func(func_in_transaction_def_with_seq(nextval('myseq')))); + + SELECT create_distributed_table('table_to_prop_seq_func','id'); + + -- Function should be marked as distributed after distributing the table that depends on it + SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid; +COMMIT; + +-- Function should be marked as distributed on the worker after committing changes +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction_def_with_seq'::regproc::oid;$$) ORDER BY 1,2; + RESET search_path; SET client_min_messages TO WARNING; DROP SCHEMA function_propagation_schema CASCADE;