diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 7a2ba4997..8b1dd4423 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -37,6 +37,7 @@ #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" +#include "parser/parse_coerce.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgrprotos.h" @@ -318,14 +319,7 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, Oid distributionArgumentOid) { int colocationId = INVALID_COLOCATION_ID; - bool createdColocationGroup = false; - - /* - * Get an exclusive lock on the colocation system catalog. Therefore, we - * can be sure that there will no modifications on the colocation table - * until this transaction is committed. - */ - Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + Relation pgDistColocation = heap_open(DistColocationRelationId(), ShareLock); if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { @@ -335,11 +329,13 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, if (colocationId == INVALID_COLOCATION_ID) { - colocationId = - CreateColocationGroup(ShardCount, ShardReplicationFactor, - distributionArgumentOid); + char *functionName = get_func_name(functionOid); - createdColocationGroup = true; + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot distribute the function \"%s\" since there " + "is no table to colocate with", functionName), + errhint("Provide a distributed table via \"colocate_with\" " + "option to create_distributed_function()"))); } } else @@ -353,23 +349,8 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, colocationId = TableColocationId(sourceRelationId); } - /* - * If we created a new colocation group then we need to keep the lock to - * prevent a concurrent create_distributed_table call from creating another - * colocation group with the same parameters. If we're using an existing - * colocation group then other transactions will use the same one. - */ - if (createdColocationGroup) - { - /* keep the exclusive lock */ - heap_close(pgDistColocation, NoLock); - } - else - { - /* release the exclusive lock */ - heap_close(pgDistColocation, ExclusiveLock); - } - + /* keep the lock */ + heap_close(pgDistColocation, NoLock); return colocationId; } @@ -415,15 +396,31 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp "\"citus.replication_model\" is set to \"streaming\""))); } + /* + * If the types are the same, we're good. If not, we still check if there + * is any coercion path between the types. + */ sourceDistributionColumnType = sourceDistributionColumn->vartype; if (sourceDistributionColumnType != distributionColumnType) { - char *functionName = get_func_name(functionOid); - char *sourceRelationName = get_rel_name(sourceRelationId); + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = COERCION_PATH_NONE; - ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\" " - "because distribution column types don't match", - sourceRelationName, functionName))); + coercionType = + find_coercion_pathway(distributionColumnType, sourceDistributionColumnType, + COERCION_EXPLICIT, &coercionFuncId); + + /* if there is no path for coercion, error out*/ + if (coercionType == COERCION_PATH_NONE) + { + char *functionName = get_func_name(functionOid); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\" " + "because distribution column types don't match and " + "there is no coercion path", sourceRelationName, + functionName))); + } } } diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 63a4cbc3e..ec0f81d73 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -18,6 +18,21 @@ CREATE FUNCTION add(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_text(text, text) RETURNS int + AS 'select $1::int + $2::int;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int + AS 'select 1' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are -- created just-in-time on function distribution @@ -225,7 +240,8 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming" --- a function cannot be colocated with a different distribution argument type +-- a function can be colocated with a different distribution argument type +-- as long as there is a coercion path SET citus.shard_replication_factor TO 1; CREATE TABLE replicated_table_func_test_2 (a bigint); SET citus.replication_model TO "streaming"; @@ -236,7 +252,11 @@ SELECT create_distributed_table('replicated_table_func_test_2', 'a'); (1 row) SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2'); -ERROR: cannot colocate function "replicated_table_func_test_2" and table "add_with_param_names" because distribution column types don't match + create_distributed_function +----------------------------- + +(1 row) + -- colocate_with cannot be used without distribution key SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2'); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid @@ -297,6 +317,50 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass t (1 row) +-- function with a numeric dist. arg can be colocated with int +-- column of a distributed table. In general, if there is a coercion +-- path, we rely on postgres for implicit coersions, and users for explicit coersions +-- to coerce the values +SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; + table_and_function_colocated +------------------------------ + t +(1 row) + +SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:='replicated_table_func_test_4'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_text(text, text)'::regprocedure; + table_and_function_colocated +------------------------------ + t +(1 row) + +-- cannot distribute function because there is no +-- coercion path from polygon to int +SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', colocate_with:='replicated_table_func_test_4'); +ERROR: cannot colocate function "replicated_table_func_test_4" and table "add_polygons" because distribution column types don't match and there is no coercion path +-- without the colocate_with, the function errors out since there is no +-- default colocation group +SET citus.shard_count TO 55; +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with +HINT: Provide a distributed table via "colocate_with" option to create_distributed_function() -- clear objects SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; stop_metadata_sync_to_node diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 35a23782a..88985e04e 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -15,6 +15,24 @@ CREATE FUNCTION add(integer, integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +CREATE FUNCTION add_text(text, text) RETURNS int + AS 'select $1::int + $2::int;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int + AS 'select 1' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are -- created just-in-time on function distribution @@ -127,7 +145,8 @@ SET citus.replication_model TO "statement"; SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); --- a function cannot be colocated with a different distribution argument type +-- a function can be colocated with a different distribution argument type +-- as long as there is a coercion path SET citus.shard_replication_factor TO 1; CREATE TABLE replicated_table_func_test_2 (a bigint); SET citus.replication_model TO "streaming"; @@ -166,6 +185,31 @@ FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; +-- function with a numeric dist. arg can be colocated with int +-- column of a distributed table. In general, if there is a coercion +-- path, we rely on postgres for implicit coersions, and users for explicit coersions +-- to coerce the values +SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; + +SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_text(text, text)'::regprocedure; + +-- cannot distribute function because there is no +-- coercion path from polygon to int +SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', colocate_with:='replicated_table_func_test_4'); + +-- without the colocate_with, the function errors out since there is no +-- default colocation group +SET citus.shard_count TO 55; +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); + -- clear objects SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';