diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index e688176bf..26dc5528e 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -16,21 +16,43 @@ */ #include "postgres.h" +#include "funcapi.h" +#include "access/htup_details.h" #include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_proc.h" #include "distributed/commands.h" +#include "catalog/pg_type.h" +#include "distributed/colocation_utils.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" +#include "distributed/metadata/pg_dist_object.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" #include "utils/builtins.h" #include "utils/fmgrprotos.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + +#define argumentStartsWith(arg, prefix) \ + (strncmp(arg, prefix, strlen(prefix)) == 0) /* forward declaration for helper functions*/ static char * GetFunctionDDLCommand(const RegProcedure funcOid); +static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, + Oid *distributionArgumentOid); +static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid + distributionArgumentOid); +static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid + distributionColumnType, Oid + sourceRelationId); +static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, + int *distribution_argument_index, + int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); PG_FUNCTION_INFO_V1(create_distributed_function); @@ -43,19 +65,57 @@ Datum create_distributed_function(PG_FUNCTION_ARGS) { RegProcedure funcOid = PG_GETARG_OID(0); + + text *distributionArgumentNameText = NULL; /* optional */ + text *colocateWithText = NULL; /* optional */ + const char *ddlCommand = NULL; ObjectAddress functionAddress = { 0 }; + int distributionArgumentIndex = -1; + Oid distributionArgumentOid = InvalidOid; + int colocationId = -1; + + char *distributionArgumentName = NULL; + char *colocateWithTableName = NULL; + /* if called on NULL input, error out */ if (funcOid == InvalidOid) { - ereport(ERROR, (errmsg("create_distributed_function() requires a single " - "parameter that is a valid function or procedure name " + ereport(ERROR, (errmsg("the first parameter for create_distributed_function() " + "should be a single a valid function or procedure name " "followed by a list of parameters in parantheses"), errhint("skip the parameters with OUT argtype as they are not " "part of the signature in PostgreSQL"))); } + if (PG_ARGISNULL(1)) + { + /* + * Using the default value, so distribute the function but do not set + * the distribution argument. + */ + distributionArgumentName = NULL; + } + else + { + distributionArgumentNameText = PG_GETARG_TEXT_P(1); + distributionArgumentName = text_to_cstring(distributionArgumentNameText); + } + + if (PG_ARGISNULL(2)) + { + ereport(ERROR, (errmsg("colocate_with parameter should not be NULL"), + errhint("To use the default value, set colocate_with option " + "to \"default\""))); + } + else + { + colocateWithText = PG_GETARG_TEXT_P(2); + colocateWithTableName = text_to_cstring(colocateWithText); + } + + ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid); /* @@ -71,6 +131,42 @@ create_distributed_function(PG_FUNCTION_ARGS) MarkObjectDistributed(&functionAddress); + if (distributionArgumentName == NULL) + { + /* cannot provide colocate_with without distribution_arg_name */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + { + char *functionName = get_func_name(funcOid); + + + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot distribute the function \"%s\" since the " + "distribution argument is not valid ", functionName), + errhint("To provide \"colocate_with\" option, the" + " distribution argument parameter should also " + "be provided"))); + } + + /* set distribution argument and colocationId to NULL */ + UpdateFunctionDistributionInfo(&functionAddress, NULL, NULL); + } + else if (distributionArgumentName != NULL) + { + /* get the argument index, or error out if we cannot find a valid index */ + distributionArgumentIndex = + GetDistributionArgIndex(funcOid, distributionArgumentName, + &distributionArgumentOid); + + /* get the colocation id, or error out if we cannot find an appropriate one */ + colocationId = + GetFunctionColocationId(funcOid, colocateWithTableName, + distributionArgumentOid); + + /* if provided, make sure to record the distribution argument and colocationId */ + UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex, + &colocationId); + } + PG_RETURN_VOID(); } @@ -91,6 +187,314 @@ CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionAddress) } +/* + * GetDistributionArgIndex calculates the distribution argument with the given + * parameters. The function errors out if no valid argument is found. + */ +static int +GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, + Oid *distributionArgumentOid) +{ + int distributionArgumentIndex = -1; + + int numberOfArgs = 0; + int argIndex = 0; + Oid *argTypes = NULL; + char **argNames = NULL; + char *argModes = NULL; + + HeapTuple proctup = NULL; + + *distributionArgumentOid = InvalidOid; + + proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + elog(ERROR, "cache lookup failed for function %u", functionOid); + } + + numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes); + + if (argumentStartsWith(distributionArgumentName, "$")) + { + /* skip the first character, we're safe because text_to_cstring pallocs */ + distributionArgumentName++; + + /* throws error if the input is not an integer */ + distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0); + + if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs) + { + char *functionName = get_func_name(functionOid); + + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot distribute the function \"%s\" since " + "the distribution argument is not valid", + functionName), + errhint("Either provide a valid function argument name " + "or a valid \"$paramIndex\" to " + "create_distributed_function()"))); + } + + /* + * Internal representation for the distributionArgumentIndex + * starts from 0 whereas user facing API starts from 1. + */ + distributionArgumentIndex -= 1; + *distributionArgumentOid = argTypes[distributionArgumentIndex]; + + ReleaseSysCache(proctup); + + Assert(*distributionArgumentOid != InvalidOid); + + return distributionArgumentIndex; + } + + /* + * The user didn't provid "$paramIndex" but potentially the name of the paramater. + * So, loop over the arguments and try to find the argument name that matches + * the parameter that user provided. + */ + for (argIndex = 0; argIndex < numberOfArgs; ++argIndex) + { + char *argNameOnIndex = argNames != NULL ? argNames[argIndex] : NULL; + + if (argNameOnIndex != NULL && + pg_strncasecmp(argNameOnIndex, distributionArgumentName, NAMEDATALEN) == 0) + { + distributionArgumentIndex = argIndex; + + *distributionArgumentOid = argTypes[argIndex]; + + /* we found, no need to continue */ + break; + } + } + + /* we still couldn't find the argument, so error out */ + if (distributionArgumentIndex == -1) + { + char *functionName = get_func_name(functionOid); + + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot distribute the function \"%s\" since the " + "distribution argument is not valid ", functionName), + errhint("Either provide a valid function argument name " + "or a valid \"$paramIndex\" to " + "create_distributed_function()"))); + } + + ReleaseSysCache(proctup); + + Assert(*distributionArgumentOid != InvalidOid); + + return distributionArgumentIndex; +} + + +/* + * GetFunctionColocationId gets the parameters for deciding the colocationId + * of the function that is being distributed. The function errors out if it is + * not possible to assign a colocationId to the input function. + */ +static int +GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, + Oid distributionArgumentOid) +{ + int colocationId = INVALID_COLOCATION_ID; + bool createdColocationGroup = false; + + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. + */ + Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) + { + /* check for default colocation group */ + colocationId = ColocationId(ShardCount, ShardReplicationFactor, + distributionArgumentOid); + + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = + CreateColocationGroup(ShardCount, ShardReplicationFactor, + distributionArgumentOid); + + createdColocationGroup = true; + } + } + else + { + Oid sourceRelationId = + ResolveRelationId(cstring_to_text(colocateWithTableName), false); + + EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid, + sourceRelationId); + + colocationId = TableColocationId(sourceRelationId); + } + + /* + * If we created a new colocation group then we need to keep the lock to + * prevent a concurrent create_distributed_table call from creating another + * colocation group with the same parameters. If we're using an existing + * colocation group then other transactions will use the same one. + */ + if (createdColocationGroup) + { + /* keep the exclusive lock */ + heap_close(pgDistColocation, NoLock); + } + else + { + /* release the exclusive lock */ + heap_close(pgDistColocation, ExclusiveLock); + } + + + return colocationId; +} + + +/* + * EnsureFunctionCanBeColocatedWithTable checks whether the given arguments are + * suitable to distribute the function to be colocated with given source table. + */ +static void +EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType, + Oid sourceRelationId) +{ + DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId); + char sourceDistributionMethod = sourceTableEntry->partitionMethod; + char sourceReplicationModel = sourceTableEntry->replicationModel; + Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); + Oid sourceDistributionColumnType = InvalidOid; + + if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) + { + char *functionName = get_func_name(functionOid); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot colocate function \"%s\" and table \"%s\" because " + "colocate_with option is only supported for hash " + "distributed tables.", functionName, + sourceRelationName))); + } + + if (sourceReplicationModel != REPLICATION_MODEL_STREAMING) + { + char *functionName = get_func_name(functionOid); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\"", + functionName, sourceRelationName), + errdetail("Citus currently only supports colocating function " + "with distributed tables that are created using " + "streaming replication model."), + errhint("When distributing tables make sure that " + "\"citus.replication_model\" is set to \"streaming\""))); + } + + sourceDistributionColumnType = sourceDistributionColumn->vartype; + if (sourceDistributionColumnType != distributionColumnType) + { + char *functionName = get_func_name(functionOid); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\" " + "because distribution column types don't match", + sourceRelationName, functionName))); + } +} + + +/* + * UpdateFunctionDistributionInfo gets object address of a function and + * updates its distribution_argument_index and colocationId in pg_dist_object. + */ +static void +UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, + int *distribution_argument_index, + int *colocationId) +{ + const bool indexOK = true; + + Relation pgDistObjectRel = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[3]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_object]; + bool isnull[Natts_pg_dist_object]; + bool replace[Natts_pg_dist_object]; + + pgDistObjectRel = heap_open(DistObjectRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistObjectRel); + + /* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(distAddress->classId)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(distAddress->objectId)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, + F_INT4EQ, ObjectIdGetDatum(distAddress->objectSubId)); + + scanDescriptor = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(), + indexOK, + NULL, 3, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%d,%d,%d\" " + "in pg_dist_object", distAddress->classId, + distAddress->objectId, distAddress->objectSubId))); + } + + memset(replace, 0, sizeof(replace)); + + replace[Anum_pg_dist_object_distribution_argument_index - 1] = true; + + if (distribution_argument_index != NULL) + { + values[Anum_pg_dist_object_distribution_argument_index - 1] = Int32GetDatum( + *distribution_argument_index); + isnull[Anum_pg_dist_object_distribution_argument_index - 1] = false; + } + else + { + isnull[Anum_pg_dist_object_distribution_argument_index - 1] = true; + } + + replace[Anum_pg_dist_object_colocationid - 1] = true; + if (colocationId != NULL) + { + values[Anum_pg_dist_object_colocationid - 1] = Int32GetDatum(*colocationId); + isnull[Anum_pg_dist_object_colocationid - 1] = false; + } + else + { + isnull[Anum_pg_dist_object_colocationid - 1] = true; + } + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple); + + CitusInvalidateRelcacheByRelid(DistObjectRelationId()); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + + heap_close(pgDistObjectRel, NoLock); +} + + /* * GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for * the specified function. diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 0e034bbb2..b9d23f950 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -37,7 +37,6 @@ static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); - PG_FUNCTION_INFO_V1(master_unmark_object_distributed); @@ -151,7 +150,10 @@ MarkObjectDistributed(const ObjectAddress *distAddress) /* - * ExecuteCommandAsSuperuser executes a command via SPI as superuser. + * ExecuteCommandAsSuperuser executes a command via SPI as superuser. Using this + * function (and in general SPI/SQL with superuser) should be avoided as much as + * possible. This is to prevent any user to exploit the superuser access via + * triggers. */ static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, diff --git a/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql b/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql index 8b7cc457b..aa43ad287 100644 --- a/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql +++ b/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql @@ -1,7 +1,11 @@ -CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure) +CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure, + distribution_arg_name text DEFAULT NULL, + colocate_with text DEFAULT 'default') RETURNS void LANGUAGE C CALLED ON NULL INPUT AS 'MODULE_PATHNAME', $$create_distributed_function$$; -COMMENT ON FUNCTION create_distributed_function(function_name regprocedure) +COMMENT ON FUNCTION create_distributed_function(function_name regprocedure, + distribution_arg_name text, + colocate_with text) IS 'creates a distributed function'; diff --git a/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql b/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql index 8b7cc457b..aa43ad287 100644 --- a/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql +++ b/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql @@ -1,7 +1,11 @@ -CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure) +CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure, + distribution_arg_name text DEFAULT NULL, + colocate_with text DEFAULT 'default') RETURNS void LANGUAGE C CALLED ON NULL INPUT AS 'MODULE_PATHNAME', $$create_distributed_function$$; -COMMENT ON FUNCTION create_distributed_function(function_name regprocedure) +COMMENT ON FUNCTION create_distributed_function(function_name regprocedure, + distribution_arg_name text, + colocate_with text) IS 'creates a distributed function'; diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 173302670..4c2c08125 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -18,7 +18,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)'); +SELECT create_distributed_function('add(int,int)', '$1'); create_distributed_function ----------------------------- @@ -39,7 +39,7 @@ CREATE TYPE dup_result AS (f1 int, f2 text); CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)'); +SELECT create_distributed_function('dup(int)', '$1'); create_distributed_function ----------------------------- @@ -52,6 +52,195 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY localhost | 57638 | t | (42,"42 is text") (2 rows) +CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +-- postgres doesn't accept parameter names in the regprocedure input +SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); +ERROR: syntax error at or near "int" +LINE 1: SELECT create_distributed_function('add_with_param_names(val... + ^ +CONTEXT: invalid type name "val1 int" +-- invalid distribution_arg_name +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +-- invalid distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)', '$0'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +SELECT create_distributed_function('add_with_param_names(int, int)', '$-1'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +SELECT create_distributed_function('add_with_param_names(int, int)', '$-10'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); +ERROR: invalid input syntax for integer: "1a" +-- non existing column name +SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +-- NULL function +SELECT create_distributed_function(NULL); +ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses +HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL +-- NULL colocate_with +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL); +ERROR: colocate_with parameter should not be NULL +HINT: To use the default value, set colocate_with option to "default" +-- empty string distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)', ''); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +-- valid distribution with distribution_arg_name +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + create_distributed_function +----------------------------- + +(1 row) + +-- valid distribution with distribution_arg_name -- case insensitive +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); + create_distributed_function +----------------------------- + +(1 row) + +-- valid distribution with distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)','$1'); + create_distributed_function +----------------------------- + +(1 row) + +-- a function cannot be colocated with a table that is not "streaming" replicated +SET citus.shard_replication_factor TO 2; +CREATE TABLE replicated_table_func_test (a int); +SET citus.replication_model TO "statement"; +SELECT create_distributed_table('replicated_table_func_test', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); +ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" +DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. +HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming" +-- a function cannot be colocated with a different distribution argument type +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicated_table_func_test_2 (a bigint); +SET citus.replication_model TO "streaming"; +SELECT create_distributed_table('replicated_table_func_test_2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2'); +ERROR: cannot colocate function "replicated_table_func_test_2" and table "add_with_param_names" because distribution column types don't match +-- colocate_with cannot be used without distribution key +SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2'); +ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided +-- a function cannot be colocated with a local table +CREATE TABLE replicated_table_func_test_3 (a bigint); +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); +ERROR: relation replicated_table_func_test_3 is not distributed +-- a function cannot be colocated with a reference table +SELECT create_reference_table('replicated_table_func_test_3'); + create_reference_table +------------------------ + +(1 row) + +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); +ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables. +-- finally, colocate the function with a distributed table +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicated_table_func_test_4 (a int); +SET citus.replication_model TO "streaming"; +SELECT create_distributed_table('replicated_table_func_test_4', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4'); + create_distributed_function +----------------------------- + +(1 row) + +-- show that the colocationIds are the same +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_with_param_names(int, int)'::regprocedure; + table_and_function_colocated +------------------------------ + t +(1 row) + +-- now, re-distributed with the default colocation option, we should still see that the same colocation +-- group preserved, because we're using the default shard creationg settings +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_with_param_names(int, int)'::regprocedure; + table_and_function_colocated +------------------------------ + t +(1 row) + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + ?column? | ?column? +----------+---------- + t | t +(1 row) + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out index c79f53108..1180a3f69 100644 --- a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -1806,7 +1806,7 @@ step s2-public-schema: step s2-distribute-function: CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL; - SELECT create_distributed_function('add(INT,INT)'); + SELECT create_distributed_function('add(INT,INT)', '$1'); step s1-commit: COMMIT; @@ -1921,7 +1921,7 @@ step s2-public-schema: step s2-distribute-function: CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL; - SELECT create_distributed_function('add(INT,INT)'); + SELECT create_distributed_function('add(INT,INT)', '$1'); create_distributed_function @@ -2045,7 +2045,7 @@ step s2-create-schema: step s2-distribute-function: CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL; - SELECT create_distributed_function('add(INT,INT)'); + SELECT create_distributed_function('add(INT,INT)', '$1'); create_distributed_function diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec index f09df23e7..7c6d00eef 100644 --- a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -117,7 +117,7 @@ step "s2-create-table-with-type" step "s2-distribute-function" { CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL; - SELECT create_distributed_function('add(INT,INT)'); + SELECT create_distributed_function('add(INT,INT)', '$1'); } step "s2-begin" diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index c09132913..2dc6f7a42 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -15,7 +15,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)'); +SELECT create_distributed_function('add(int,int)', '$1'); SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; @@ -30,12 +30,120 @@ CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)'); +SELECT create_distributed_function('dup(int)', '$1'); SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; +CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +-- postgres doesn't accept parameter names in the regprocedure input +SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); + +-- invalid distribution_arg_name +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test'); +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int'); + +-- invalid distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)', '$0'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$-1'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$-10'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); + +-- non existing column name +SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); + +-- NULL function +SELECT create_distributed_function(NULL); + +-- NULL colocate_with +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL); + +-- empty string distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)', ''); + +-- valid distribution with distribution_arg_name +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + +-- valid distribution with distribution_arg_name -- case insensitive +SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); + +-- valid distribution with distribution_arg_index +SELECT create_distributed_function('add_with_param_names(int, int)','$1'); + +-- a function cannot be colocated with a table that is not "streaming" replicated +SET citus.shard_replication_factor TO 2; +CREATE TABLE replicated_table_func_test (a int); +SET citus.replication_model TO "statement"; +SELECT create_distributed_table('replicated_table_func_test', 'a'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); + +-- a function cannot be colocated with a different distribution argument type +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicated_table_func_test_2 (a bigint); +SET citus.replication_model TO "streaming"; +SELECT create_distributed_table('replicated_table_func_test_2', 'a'); +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2'); + +-- colocate_with cannot be used without distribution key +SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2'); + +-- a function cannot be colocated with a local table +CREATE TABLE replicated_table_func_test_3 (a bigint); +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); + +-- a function cannot be colocated with a reference table +SELECT create_reference_table('replicated_table_func_test_3'); +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); + +-- finally, colocate the function with a distributed table +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicated_table_func_test_4 (a int); +SET citus.replication_model TO "streaming"; +SELECT create_distributed_table('replicated_table_func_test_4', 'a'); +SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4'); + +-- show that the colocationIds are the same +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_with_param_names(int, int)'::regprocedure; + +-- now, re-distributed with the default colocation option, we should still see that the same colocation +-- group preserved, because we're using the default shard creationg settings +SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND + objects.objid = 'add_with_param_names(int, int)'::regprocedure; + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); DROP USER functionuser; -SELECT run_command_on_workers($$DROP USER functionuser;$$); \ No newline at end of file +SELECT run_command_on_workers($$DROP USER functionuser;$$);