mirror of https://github.com/citusdata/citus.git
Merge pull request #2990 from citusdata/add_function_arguments
Add arguments to `create_distributed_function()`pull/2989/head
commit
900f5a61fc
|
@ -16,21 +16,43 @@
|
|||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "funcapi.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgrprotos.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
#define argumentStartsWith(arg, prefix) \
|
||||
(strncmp(arg, prefix, strlen(prefix)) == 0)
|
||||
|
||||
/* forward declaration for helper functions*/
|
||||
static char * GetFunctionDDLCommand(const RegProcedure funcOid);
|
||||
static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
|
||||
Oid *distributionArgumentOid);
|
||||
static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid
|
||||
distributionArgumentOid);
|
||||
static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
|
||||
distributionColumnType, Oid
|
||||
sourceRelationId);
|
||||
static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
||||
int *distribution_argument_index,
|
||||
int *colocationId);
|
||||
static void EnsureSequentialModeForFunctionDDL(void);
|
||||
|
||||
PG_FUNCTION_INFO_V1(create_distributed_function);
|
||||
|
@ -43,19 +65,57 @@ Datum
|
|||
create_distributed_function(PG_FUNCTION_ARGS)
|
||||
{
|
||||
RegProcedure funcOid = PG_GETARG_OID(0);
|
||||
|
||||
text *distributionArgumentNameText = NULL; /* optional */
|
||||
text *colocateWithText = NULL; /* optional */
|
||||
|
||||
const char *ddlCommand = NULL;
|
||||
ObjectAddress functionAddress = { 0 };
|
||||
|
||||
int distributionArgumentIndex = -1;
|
||||
Oid distributionArgumentOid = InvalidOid;
|
||||
int colocationId = -1;
|
||||
|
||||
char *distributionArgumentName = NULL;
|
||||
char *colocateWithTableName = NULL;
|
||||
|
||||
/* if called on NULL input, error out */
|
||||
if (funcOid == InvalidOid)
|
||||
{
|
||||
ereport(ERROR, (errmsg("create_distributed_function() requires a single "
|
||||
"parameter that is a valid function or procedure name "
|
||||
ereport(ERROR, (errmsg("the first parameter for create_distributed_function() "
|
||||
"should be a single a valid function or procedure name "
|
||||
"followed by a list of parameters in parantheses"),
|
||||
errhint("skip the parameters with OUT argtype as they are not "
|
||||
"part of the signature in PostgreSQL")));
|
||||
}
|
||||
|
||||
if (PG_ARGISNULL(1))
|
||||
{
|
||||
/*
|
||||
* Using the default value, so distribute the function but do not set
|
||||
* the distribution argument.
|
||||
*/
|
||||
distributionArgumentName = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
distributionArgumentNameText = PG_GETARG_TEXT_P(1);
|
||||
distributionArgumentName = text_to_cstring(distributionArgumentNameText);
|
||||
}
|
||||
|
||||
if (PG_ARGISNULL(2))
|
||||
{
|
||||
ereport(ERROR, (errmsg("colocate_with parameter should not be NULL"),
|
||||
errhint("To use the default value, set colocate_with option "
|
||||
"to \"default\"")));
|
||||
}
|
||||
else
|
||||
{
|
||||
colocateWithText = PG_GETARG_TEXT_P(2);
|
||||
colocateWithTableName = text_to_cstring(colocateWithText);
|
||||
}
|
||||
|
||||
|
||||
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
|
||||
|
||||
/*
|
||||
|
@ -71,6 +131,42 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
|||
|
||||
MarkObjectDistributed(&functionAddress);
|
||||
|
||||
if (distributionArgumentName == NULL)
|
||||
{
|
||||
/* cannot provide colocate_with without distribution_arg_name */
|
||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
|
||||
{
|
||||
char *functionName = get_func_name(funcOid);
|
||||
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot distribute the function \"%s\" since the "
|
||||
"distribution argument is not valid ", functionName),
|
||||
errhint("To provide \"colocate_with\" option, the"
|
||||
" distribution argument parameter should also "
|
||||
"be provided")));
|
||||
}
|
||||
|
||||
/* set distribution argument and colocationId to NULL */
|
||||
UpdateFunctionDistributionInfo(&functionAddress, NULL, NULL);
|
||||
}
|
||||
else if (distributionArgumentName != NULL)
|
||||
{
|
||||
/* get the argument index, or error out if we cannot find a valid index */
|
||||
distributionArgumentIndex =
|
||||
GetDistributionArgIndex(funcOid, distributionArgumentName,
|
||||
&distributionArgumentOid);
|
||||
|
||||
/* get the colocation id, or error out if we cannot find an appropriate one */
|
||||
colocationId =
|
||||
GetFunctionColocationId(funcOid, colocateWithTableName,
|
||||
distributionArgumentOid);
|
||||
|
||||
/* if provided, make sure to record the distribution argument and colocationId */
|
||||
UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex,
|
||||
&colocationId);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -91,6 +187,314 @@ CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionAddress)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDistributionArgIndex calculates the distribution argument with the given
|
||||
* parameters. The function errors out if no valid argument is found.
|
||||
*/
|
||||
static int
|
||||
GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
|
||||
Oid *distributionArgumentOid)
|
||||
{
|
||||
int distributionArgumentIndex = -1;
|
||||
|
||||
int numberOfArgs = 0;
|
||||
int argIndex = 0;
|
||||
Oid *argTypes = NULL;
|
||||
char **argNames = NULL;
|
||||
char *argModes = NULL;
|
||||
|
||||
HeapTuple proctup = NULL;
|
||||
|
||||
*distributionArgumentOid = InvalidOid;
|
||||
|
||||
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
|
||||
if (!HeapTupleIsValid(proctup))
|
||||
{
|
||||
elog(ERROR, "cache lookup failed for function %u", functionOid);
|
||||
}
|
||||
|
||||
numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
|
||||
|
||||
if (argumentStartsWith(distributionArgumentName, "$"))
|
||||
{
|
||||
/* skip the first character, we're safe because text_to_cstring pallocs */
|
||||
distributionArgumentName++;
|
||||
|
||||
/* throws error if the input is not an integer */
|
||||
distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0);
|
||||
|
||||
if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs)
|
||||
{
|
||||
char *functionName = get_func_name(functionOid);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot distribute the function \"%s\" since "
|
||||
"the distribution argument is not valid",
|
||||
functionName),
|
||||
errhint("Either provide a valid function argument name "
|
||||
"or a valid \"$paramIndex\" to "
|
||||
"create_distributed_function()")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Internal representation for the distributionArgumentIndex
|
||||
* starts from 0 whereas user facing API starts from 1.
|
||||
*/
|
||||
distributionArgumentIndex -= 1;
|
||||
*distributionArgumentOid = argTypes[distributionArgumentIndex];
|
||||
|
||||
ReleaseSysCache(proctup);
|
||||
|
||||
Assert(*distributionArgumentOid != InvalidOid);
|
||||
|
||||
return distributionArgumentIndex;
|
||||
}
|
||||
|
||||
/*
|
||||
* The user didn't provid "$paramIndex" but potentially the name of the paramater.
|
||||
* So, loop over the arguments and try to find the argument name that matches
|
||||
* the parameter that user provided.
|
||||
*/
|
||||
for (argIndex = 0; argIndex < numberOfArgs; ++argIndex)
|
||||
{
|
||||
char *argNameOnIndex = argNames != NULL ? argNames[argIndex] : NULL;
|
||||
|
||||
if (argNameOnIndex != NULL &&
|
||||
pg_strncasecmp(argNameOnIndex, distributionArgumentName, NAMEDATALEN) == 0)
|
||||
{
|
||||
distributionArgumentIndex = argIndex;
|
||||
|
||||
*distributionArgumentOid = argTypes[argIndex];
|
||||
|
||||
/* we found, no need to continue */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* we still couldn't find the argument, so error out */
|
||||
if (distributionArgumentIndex == -1)
|
||||
{
|
||||
char *functionName = get_func_name(functionOid);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot distribute the function \"%s\" since the "
|
||||
"distribution argument is not valid ", functionName),
|
||||
errhint("Either provide a valid function argument name "
|
||||
"or a valid \"$paramIndex\" to "
|
||||
"create_distributed_function()")));
|
||||
}
|
||||
|
||||
ReleaseSysCache(proctup);
|
||||
|
||||
Assert(*distributionArgumentOid != InvalidOid);
|
||||
|
||||
return distributionArgumentIndex;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetFunctionColocationId gets the parameters for deciding the colocationId
|
||||
* of the function that is being distributed. The function errors out if it is
|
||||
* not possible to assign a colocationId to the input function.
|
||||
*/
|
||||
static int
|
||||
GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
|
||||
Oid distributionArgumentOid)
|
||||
{
|
||||
int colocationId = INVALID_COLOCATION_ID;
|
||||
bool createdColocationGroup = false;
|
||||
|
||||
/*
|
||||
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||
* can be sure that there will no modifications on the colocation table
|
||||
* until this transaction is committed.
|
||||
*/
|
||||
Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
||||
|
||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
||||
{
|
||||
/* check for default colocation group */
|
||||
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
|
||||
distributionArgumentOid);
|
||||
|
||||
if (colocationId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
colocationId =
|
||||
CreateColocationGroup(ShardCount, ShardReplicationFactor,
|
||||
distributionArgumentOid);
|
||||
|
||||
createdColocationGroup = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Oid sourceRelationId =
|
||||
ResolveRelationId(cstring_to_text(colocateWithTableName), false);
|
||||
|
||||
EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid,
|
||||
sourceRelationId);
|
||||
|
||||
colocationId = TableColocationId(sourceRelationId);
|
||||
}
|
||||
|
||||
/*
|
||||
* If we created a new colocation group then we need to keep the lock to
|
||||
* prevent a concurrent create_distributed_table call from creating another
|
||||
* colocation group with the same parameters. If we're using an existing
|
||||
* colocation group then other transactions will use the same one.
|
||||
*/
|
||||
if (createdColocationGroup)
|
||||
{
|
||||
/* keep the exclusive lock */
|
||||
heap_close(pgDistColocation, NoLock);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* release the exclusive lock */
|
||||
heap_close(pgDistColocation, ExclusiveLock);
|
||||
}
|
||||
|
||||
|
||||
return colocationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureFunctionCanBeColocatedWithTable checks whether the given arguments are
|
||||
* suitable to distribute the function to be colocated with given source table.
|
||||
*/
|
||||
static void
|
||||
EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType,
|
||||
Oid sourceRelationId)
|
||||
{
|
||||
DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
|
||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
|
||||
Oid sourceDistributionColumnType = InvalidOid;
|
||||
|
||||
if (sourceDistributionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
char *functionName = get_func_name(functionOid);
|
||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot colocate function \"%s\" and table \"%s\" because "
|
||||
"colocate_with option is only supported for hash "
|
||||
"distributed tables.", functionName,
|
||||
sourceRelationName)));
|
||||
}
|
||||
|
||||
if (sourceReplicationModel != REPLICATION_MODEL_STREAMING)
|
||||
{
|
||||
char *functionName = get_func_name(functionOid);
|
||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||
|
||||
ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\"",
|
||||
functionName, sourceRelationName),
|
||||
errdetail("Citus currently only supports colocating function "
|
||||
"with distributed tables that are created using "
|
||||
"streaming replication model."),
|
||||
errhint("When distributing tables make sure that "
|
||||
"\"citus.replication_model\" is set to \"streaming\"")));
|
||||
}
|
||||
|
||||
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
||||
if (sourceDistributionColumnType != distributionColumnType)
|
||||
{
|
||||
char *functionName = get_func_name(functionOid);
|
||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||
|
||||
ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\" "
|
||||
"because distribution column types don't match",
|
||||
sourceRelationName, functionName)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateFunctionDistributionInfo gets object address of a function and
|
||||
* updates its distribution_argument_index and colocationId in pg_dist_object.
|
||||
*/
|
||||
static void
|
||||
UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
||||
int *distribution_argument_index,
|
||||
int *colocationId)
|
||||
{
|
||||
const bool indexOK = true;
|
||||
|
||||
Relation pgDistObjectRel = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
ScanKeyData scanKey[3];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
Datum values[Natts_pg_dist_object];
|
||||
bool isnull[Natts_pg_dist_object];
|
||||
bool replace[Natts_pg_dist_object];
|
||||
|
||||
pgDistObjectRel = heap_open(DistObjectRelationId(), RowExclusiveLock);
|
||||
tupleDescriptor = RelationGetDescr(pgDistObjectRel);
|
||||
|
||||
/* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(distAddress->classId));
|
||||
ScanKeyInit(&scanKey[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(distAddress->objectId));
|
||||
ScanKeyInit(&scanKey[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber,
|
||||
F_INT4EQ, ObjectIdGetDatum(distAddress->objectSubId));
|
||||
|
||||
scanDescriptor = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(),
|
||||
indexOK,
|
||||
NULL, 3, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%d,%d,%d\" "
|
||||
"in pg_dist_object", distAddress->classId,
|
||||
distAddress->objectId, distAddress->objectSubId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
replace[Anum_pg_dist_object_distribution_argument_index - 1] = true;
|
||||
|
||||
if (distribution_argument_index != NULL)
|
||||
{
|
||||
values[Anum_pg_dist_object_distribution_argument_index - 1] = Int32GetDatum(
|
||||
*distribution_argument_index);
|
||||
isnull[Anum_pg_dist_object_distribution_argument_index - 1] = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
isnull[Anum_pg_dist_object_distribution_argument_index - 1] = true;
|
||||
}
|
||||
|
||||
replace[Anum_pg_dist_object_colocationid - 1] = true;
|
||||
if (colocationId != NULL)
|
||||
{
|
||||
values[Anum_pg_dist_object_colocationid - 1] = Int32GetDatum(*colocationId);
|
||||
isnull[Anum_pg_dist_object_colocationid - 1] = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
isnull[Anum_pg_dist_object_colocationid - 1] = true;
|
||||
}
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
|
||||
CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple);
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
|
||||
heap_close(pgDistObjectRel, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for
|
||||
* the specified function.
|
||||
|
|
|
@ -37,7 +37,6 @@
|
|||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
Datum *paramValues);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
|
||||
|
||||
|
||||
|
@ -151,7 +150,10 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
|
|||
|
||||
|
||||
/*
|
||||
* ExecuteCommandAsSuperuser executes a command via SPI as superuser.
|
||||
* ExecuteCommandAsSuperuser executes a command via SPI as superuser. Using this
|
||||
* function (and in general SPI/SQL with superuser) should be avoided as much as
|
||||
* possible. This is to prevent any user to exploit the superuser access via
|
||||
* triggers.
|
||||
*/
|
||||
static int
|
||||
ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
|
||||
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure,
|
||||
distribution_arg_name text DEFAULT NULL,
|
||||
colocate_with text DEFAULT 'default')
|
||||
RETURNS void
|
||||
LANGUAGE C CALLED ON NULL INPUT
|
||||
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
|
||||
|
||||
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
|
||||
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure,
|
||||
distribution_arg_name text,
|
||||
colocate_with text)
|
||||
IS 'creates a distributed function';
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
|
||||
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure,
|
||||
distribution_arg_name text DEFAULT NULL,
|
||||
colocate_with text DEFAULT 'default')
|
||||
RETURNS void
|
||||
LANGUAGE C CALLED ON NULL INPUT
|
||||
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
|
||||
|
||||
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
|
||||
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure,
|
||||
distribution_arg_name text,
|
||||
colocate_with text)
|
||||
IS 'creates a distributed function';
|
||||
|
|
|
@ -18,7 +18,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer
|
|||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
SELECT create_distributed_function('add(int,int)');
|
||||
SELECT create_distributed_function('add(int,int)', '$1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
|
@ -39,7 +39,7 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
|
|||
CREATE FUNCTION dup(int) RETURNS dup_result
|
||||
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
||||
LANGUAGE SQL;
|
||||
SELECT create_distributed_function('dup(int)');
|
||||
SELECT create_distributed_function('dup(int)', '$1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
|
@ -52,6 +52,195 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
|
|||
localhost | 57638 | t | (42,"42 is text")
|
||||
(2 rows)
|
||||
|
||||
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
-- postgres doesn't accept parameter names in the regprocedure input
|
||||
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
|
||||
ERROR: syntax error at or near "int"
|
||||
LINE 1: SELECT create_distributed_function('add_with_param_names(val...
|
||||
^
|
||||
CONTEXT: invalid type name "val1 int"
|
||||
-- invalid distribution_arg_name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
-- invalid distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
|
||||
ERROR: invalid input syntax for integer: "1a"
|
||||
-- non existing column name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
-- NULL function
|
||||
SELECT create_distributed_function(NULL);
|
||||
ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses
|
||||
HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL
|
||||
-- NULL colocate_with
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
|
||||
ERROR: colocate_with parameter should not be NULL
|
||||
HINT: To use the default value, set colocate_with option to "default"
|
||||
-- empty string distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||
-- valid distribution with distribution_arg_name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- valid distribution with distribution_arg_name -- case insensitive
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- valid distribution with distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- a function cannot be colocated with a table that is not "streaming" replicated
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
CREATE TABLE replicated_table_func_test (a int);
|
||||
SET citus.replication_model TO "statement";
|
||||
SELECT create_distributed_table('replicated_table_func_test', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
|
||||
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
|
||||
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
|
||||
HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming"
|
||||
-- a function cannot be colocated with a different distribution argument type
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE replicated_table_func_test_2 (a bigint);
|
||||
SET citus.replication_model TO "streaming";
|
||||
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
|
||||
ERROR: cannot colocate function "replicated_table_func_test_2" and table "add_with_param_names" because distribution column types don't match
|
||||
-- colocate_with cannot be used without distribution key
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||
HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided
|
||||
-- a function cannot be colocated with a local table
|
||||
CREATE TABLE replicated_table_func_test_3 (a bigint);
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
|
||||
ERROR: relation replicated_table_func_test_3 is not distributed
|
||||
-- a function cannot be colocated with a reference table
|
||||
SELECT create_reference_table('replicated_table_func_test_3');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
|
||||
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables.
|
||||
-- finally, colocate the function with a distributed table
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE replicated_table_func_test_4 (a int);
|
||||
SET citus.replication_model TO "streaming";
|
||||
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show that the colocationIds are the same
|
||||
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
|
||||
FROM pg_dist_partition, citus.pg_dist_object as objects
|
||||
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
|
||||
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
|
||||
table_and_function_colocated
|
||||
------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- now, re-distributed with the default colocation option, we should still see that the same colocation
|
||||
-- group preserved, because we're using the default shard creationg settings
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
|
||||
FROM pg_dist_partition, citus.pg_dist_object as objects
|
||||
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
|
||||
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
|
||||
table_and_function_colocated
|
||||
------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- if not paremeters are supplied, we'd see that function doesn't have
|
||||
-- distribution_argument_index and colocationid
|
||||
SELECT create_distributed_function('add_mixed_param_names(int, int)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
|
||||
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
|
||||
?column? | ?column?
|
||||
----------+----------
|
||||
t | t
|
||||
(1 row)
|
||||
|
||||
-- also show that we can use the function
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
|
||||
nodename | nodeport | success | result
|
||||
-----------+----------+---------+--------
|
||||
localhost | 57637 | t | 5
|
||||
localhost | 57638 | t | 5
|
||||
(2 rows)
|
||||
|
||||
-- clear objects
|
||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||
DROP SCHEMA function_tests CASCADE;
|
||||
|
|
|
@ -1806,7 +1806,7 @@ step s2-public-schema:
|
|||
|
||||
step s2-distribute-function:
|
||||
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
|
||||
SELECT create_distributed_function('add(INT,INT)');
|
||||
SELECT create_distributed_function('add(INT,INT)', '$1');
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
@ -1921,7 +1921,7 @@ step s2-public-schema:
|
|||
|
||||
step s2-distribute-function:
|
||||
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
|
||||
SELECT create_distributed_function('add(INT,INT)');
|
||||
SELECT create_distributed_function('add(INT,INT)', '$1');
|
||||
|
||||
create_distributed_function
|
||||
|
||||
|
@ -2045,7 +2045,7 @@ step s2-create-schema:
|
|||
|
||||
step s2-distribute-function:
|
||||
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
|
||||
SELECT create_distributed_function('add(INT,INT)');
|
||||
SELECT create_distributed_function('add(INT,INT)', '$1');
|
||||
|
||||
create_distributed_function
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ step "s2-create-table-with-type"
|
|||
step "s2-distribute-function"
|
||||
{
|
||||
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
|
||||
SELECT create_distributed_function('add(INT,INT)');
|
||||
SELECT create_distributed_function('add(INT,INT)', '$1');
|
||||
}
|
||||
|
||||
step "s2-begin"
|
||||
|
|
|
@ -15,7 +15,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer
|
|||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
SELECT create_distributed_function('add(int,int)');
|
||||
SELECT create_distributed_function('add(int,int)', '$1');
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
||||
|
||||
|
||||
|
@ -30,12 +30,120 @@ CREATE FUNCTION dup(int) RETURNS dup_result
|
|||
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
||||
LANGUAGE SQL;
|
||||
|
||||
SELECT create_distributed_function('dup(int)');
|
||||
SELECT create_distributed_function('dup(int)', '$1');
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
|
||||
|
||||
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
-- postgres doesn't accept parameter names in the regprocedure input
|
||||
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
|
||||
|
||||
-- invalid distribution_arg_name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
|
||||
|
||||
-- invalid distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
|
||||
|
||||
-- non existing column name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
|
||||
|
||||
-- NULL function
|
||||
SELECT create_distributed_function(NULL);
|
||||
|
||||
-- NULL colocate_with
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
|
||||
|
||||
-- empty string distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '');
|
||||
|
||||
-- valid distribution with distribution_arg_name
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
|
||||
|
||||
-- valid distribution with distribution_arg_name -- case insensitive
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
|
||||
|
||||
-- valid distribution with distribution_arg_index
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
|
||||
|
||||
-- a function cannot be colocated with a table that is not "streaming" replicated
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
CREATE TABLE replicated_table_func_test (a int);
|
||||
SET citus.replication_model TO "statement";
|
||||
SELECT create_distributed_table('replicated_table_func_test', 'a');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
|
||||
|
||||
-- a function cannot be colocated with a different distribution argument type
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE replicated_table_func_test_2 (a bigint);
|
||||
SET citus.replication_model TO "streaming";
|
||||
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
|
||||
|
||||
-- colocate_with cannot be used without distribution key
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
|
||||
|
||||
-- a function cannot be colocated with a local table
|
||||
CREATE TABLE replicated_table_func_test_3 (a bigint);
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
|
||||
|
||||
-- a function cannot be colocated with a reference table
|
||||
SELECT create_reference_table('replicated_table_func_test_3');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
|
||||
|
||||
-- finally, colocate the function with a distributed table
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE replicated_table_func_test_4 (a int);
|
||||
SET citus.replication_model TO "streaming";
|
||||
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
|
||||
|
||||
-- show that the colocationIds are the same
|
||||
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
|
||||
FROM pg_dist_partition, citus.pg_dist_object as objects
|
||||
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
|
||||
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
|
||||
|
||||
-- now, re-distributed with the default colocation option, we should still see that the same colocation
|
||||
-- group preserved, because we're using the default shard creationg settings
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
|
||||
FROM pg_dist_partition, citus.pg_dist_object as objects
|
||||
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
|
||||
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
|
||||
|
||||
-- if not paremeters are supplied, we'd see that function doesn't have
|
||||
-- distribution_argument_index and colocationid
|
||||
SELECT create_distributed_function('add_mixed_param_names(int, int)');
|
||||
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
|
||||
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
|
||||
|
||||
-- also show that we can use the function
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
|
||||
|
||||
-- clear objects
|
||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||
DROP SCHEMA function_tests CASCADE;
|
||||
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$);
|
||||
DROP USER functionuser;
|
||||
SELECT run_command_on_workers($$DROP USER functionuser;$$);
|
||||
SELECT run_command_on_workers($$DROP USER functionuser;$$);
|
||||
|
|
Loading…
Reference in New Issue