Add parameters to create_distributed_function()

With this commit, we're changing the API for create_distributed_function()
such that users can provide the distribution argument and the colocation
information.
pull/2990/head
Onder Kalaci 2019-09-17 14:14:45 +02:00
parent ff100b2720
commit d7e2968120
8 changed files with 728 additions and 17 deletions

View File

@ -16,21 +16,43 @@
*/
#include "postgres.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "distributed/commands.h"
#include "catalog/pg_type.h"
#include "distributed/colocation_utils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/fmgrprotos.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#define argumentStartsWith(arg, prefix) \
(strncmp(arg, prefix, strlen(prefix)) == 0)
/* forward declaration for helper functions*/
static char * GetFunctionDDLCommand(const RegProcedure funcOid);
static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
Oid *distributionArgumentOid);
static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid
distributionArgumentOid);
static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
distributionColumnType, Oid
sourceRelationId);
static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *distribution_argument_index,
int *colocationId);
static void EnsureSequentialModeForFunctionDDL(void);
PG_FUNCTION_INFO_V1(create_distributed_function);
@ -43,19 +65,57 @@ Datum
create_distributed_function(PG_FUNCTION_ARGS)
{
RegProcedure funcOid = PG_GETARG_OID(0);
text *distributionArgumentNameText = NULL; /* optional */
text *colocateWithText = NULL; /* optional */
const char *ddlCommand = NULL;
ObjectAddress functionAddress = { 0 };
int distributionArgumentIndex = -1;
Oid distributionArgumentOid = InvalidOid;
int colocationId = -1;
char *distributionArgumentName = NULL;
char *colocateWithTableName = NULL;
/* if called on NULL input, error out */
if (funcOid == InvalidOid)
{
ereport(ERROR, (errmsg("create_distributed_function() requires a single "
"parameter that is a valid function or procedure name "
ereport(ERROR, (errmsg("the first parameter for create_distributed_function() "
"should be a single a valid function or procedure name "
"followed by a list of parameters in parantheses"),
errhint("skip the parameters with OUT argtype as they are not "
"part of the signature in PostgreSQL")));
}
if (PG_ARGISNULL(1))
{
/*
* Using the default value, so distribute the function but do not set
* the distribution argument.
*/
distributionArgumentName = NULL;
}
else
{
distributionArgumentNameText = PG_GETARG_TEXT_P(1);
distributionArgumentName = text_to_cstring(distributionArgumentNameText);
}
if (PG_ARGISNULL(2))
{
ereport(ERROR, (errmsg("colocate_with parameter should not be NULL"),
errhint("To use the default value, set colocate_with option "
"to \"default\"")));
}
else
{
colocateWithText = PG_GETARG_TEXT_P(2);
colocateWithTableName = text_to_cstring(colocateWithText);
}
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
/*
@ -71,6 +131,42 @@ create_distributed_function(PG_FUNCTION_ARGS)
MarkObjectDistributed(&functionAddress);
if (distributionArgumentName == NULL)
{
/* cannot provide colocate_with without distribution_arg_name */
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
{
char *functionName = get_func_name(funcOid);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot distribute the function \"%s\" since the "
"distribution argument is not valid ", functionName),
errhint("To provide \"colocate_with\" option, the"
" distribution argument parameter should also "
"be provided")));
}
/* set distribution argument and colocationId to NULL */
UpdateFunctionDistributionInfo(&functionAddress, NULL, NULL);
}
else if (distributionArgumentName != NULL)
{
/* get the argument index, or error out if we cannot find a valid index */
distributionArgumentIndex =
GetDistributionArgIndex(funcOid, distributionArgumentName,
&distributionArgumentOid);
/* get the colocation id, or error out if we cannot find an appropriate one */
colocationId =
GetFunctionColocationId(funcOid, colocateWithTableName,
distributionArgumentOid);
/* if provided, make sure to record the distribution argument and colocationId */
UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex,
&colocationId);
}
PG_RETURN_VOID();
}
@ -91,6 +187,314 @@ CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionAddress)
}
/*
* GetDistributionArgIndex calculates the distribution argument with the given
* parameters. The function errors out if no valid argument is found.
*/
static int
GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
Oid *distributionArgumentOid)
{
int distributionArgumentIndex = -1;
int numberOfArgs = 0;
int argIndex = 0;
Oid *argTypes = NULL;
char **argNames = NULL;
char *argModes = NULL;
HeapTuple proctup = NULL;
*distributionArgumentOid = InvalidOid;
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for function %u", functionOid);
}
numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
if (argumentStartsWith(distributionArgumentName, "$"))
{
/* skip the first character, we're safe because text_to_cstring pallocs */
distributionArgumentName++;
/* throws error if the input is not an integer */
distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0);
if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs)
{
char *functionName = get_func_name(functionOid);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot distribute the function \"%s\" since "
"the distribution argument is not valid",
functionName),
errhint("Either provide a valid function argument name "
"or a valid \"$paramIndex\" to "
"create_distributed_function()")));
}
/*
* Internal representation for the distributionArgumentIndex
* starts from 0 whereas user facing API starts from 1.
*/
distributionArgumentIndex -= 1;
*distributionArgumentOid = argTypes[distributionArgumentIndex];
ReleaseSysCache(proctup);
Assert(*distributionArgumentOid != InvalidOid);
return distributionArgumentIndex;
}
/*
* The user didn't provid "$paramIndex" but potentially the name of the paramater.
* So, loop over the arguments and try to find the argument name that matches
* the parameter that user provided.
*/
for (argIndex = 0; argIndex < numberOfArgs; ++argIndex)
{
char *argNameOnIndex = argNames != NULL ? argNames[argIndex] : NULL;
if (argNameOnIndex != NULL &&
pg_strncasecmp(argNameOnIndex, distributionArgumentName, NAMEDATALEN) == 0)
{
distributionArgumentIndex = argIndex;
*distributionArgumentOid = argTypes[argIndex];
/* we found, no need to continue */
break;
}
}
/* we still couldn't find the argument, so error out */
if (distributionArgumentIndex == -1)
{
char *functionName = get_func_name(functionOid);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot distribute the function \"%s\" since the "
"distribution argument is not valid ", functionName),
errhint("Either provide a valid function argument name "
"or a valid \"$paramIndex\" to "
"create_distributed_function()")));
}
ReleaseSysCache(proctup);
Assert(*distributionArgumentOid != InvalidOid);
return distributionArgumentIndex;
}
/*
* GetFunctionColocationId gets the parameters for deciding the colocationId
* of the function that is being distributed. The function errors out if it is
* not possible to assign a colocationId to the input function.
*/
static int
GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
Oid distributionArgumentOid)
{
int colocationId = INVALID_COLOCATION_ID;
bool createdColocationGroup = false;
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
{
/* check for default colocation group */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionArgumentOid);
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId =
CreateColocationGroup(ShardCount, ShardReplicationFactor,
distributionArgumentOid);
createdColocationGroup = true;
}
}
else
{
Oid sourceRelationId =
ResolveRelationId(cstring_to_text(colocateWithTableName), false);
EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid,
sourceRelationId);
colocationId = TableColocationId(sourceRelationId);
}
/*
* If we created a new colocation group then we need to keep the lock to
* prevent a concurrent create_distributed_table call from creating another
* colocation group with the same parameters. If we're using an existing
* colocation group then other transactions will use the same one.
*/
if (createdColocationGroup)
{
/* keep the exclusive lock */
heap_close(pgDistColocation, NoLock);
}
else
{
/* release the exclusive lock */
heap_close(pgDistColocation, ExclusiveLock);
}
return colocationId;
}
/*
* EnsureFunctionCanBeColocatedWithTable checks whether the given arguments are
* suitable to distribute the function to be colocated with given source table.
*/
static void
EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType,
Oid sourceRelationId)
{
DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = InvalidOid;
if (sourceDistributionMethod != DISTRIBUTE_BY_HASH)
{
char *functionName = get_func_name(functionOid);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot colocate function \"%s\" and table \"%s\" because "
"colocate_with option is only supported for hash "
"distributed tables.", functionName,
sourceRelationName)));
}
if (sourceReplicationModel != REPLICATION_MODEL_STREAMING)
{
char *functionName = get_func_name(functionOid);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\"",
functionName, sourceRelationName),
errdetail("Citus currently only supports colocating function "
"with distributed tables that are created using "
"streaming replication model."),
errhint("When distributing tables make sure that "
"\"citus.replication_model\" is set to \"streaming\"")));
}
sourceDistributionColumnType = sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *functionName = get_func_name(functionOid);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate function \"%s\" and table \"%s\" "
"because distribution column types don't match",
sourceRelationName, functionName)));
}
}
/*
* UpdateFunctionDistributionInfo gets object address of a function and
* updates its distribution_argument_index and colocationId in pg_dist_object.
*/
static void
UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *distribution_argument_index,
int *colocationId)
{
const bool indexOK = true;
Relation pgDistObjectRel = NULL;
TupleDesc tupleDescriptor = NULL;
ScanKeyData scanKey[3];
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_object];
bool isnull[Natts_pg_dist_object];
bool replace[Natts_pg_dist_object];
pgDistObjectRel = heap_open(DistObjectRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistObjectRel);
/* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */
ScanKeyInit(&scanKey[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(distAddress->classId));
ScanKeyInit(&scanKey[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(distAddress->objectId));
ScanKeyInit(&scanKey[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber,
F_INT4EQ, ObjectIdGetDatum(distAddress->objectSubId));
scanDescriptor = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(),
indexOK,
NULL, 3, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%d,%d,%d\" "
"in pg_dist_object", distAddress->classId,
distAddress->objectId, distAddress->objectSubId)));
}
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_object_distribution_argument_index - 1] = true;
if (distribution_argument_index != NULL)
{
values[Anum_pg_dist_object_distribution_argument_index - 1] = Int32GetDatum(
*distribution_argument_index);
isnull[Anum_pg_dist_object_distribution_argument_index - 1] = false;
}
else
{
isnull[Anum_pg_dist_object_distribution_argument_index - 1] = true;
}
replace[Anum_pg_dist_object_colocationid - 1] = true;
if (colocationId != NULL)
{
values[Anum_pg_dist_object_colocationid - 1] = Int32GetDatum(*colocationId);
isnull[Anum_pg_dist_object_colocationid - 1] = false;
}
else
{
isnull[Anum_pg_dist_object_colocationid - 1] = true;
}
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistObjectRel, NoLock);
}
/*
* GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for
* the specified function.

View File

@ -37,7 +37,6 @@
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
@ -151,7 +150,10 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
/*
* ExecuteCommandAsSuperuser executes a command via SPI as superuser.
* ExecuteCommandAsSuperuser executes a command via SPI as superuser. Using this
* function (and in general SPI/SQL with superuser) should be avoided as much as
* possible. This is to prevent any user to exploit the superuser access via
* triggers.
*/
static int
ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,

View File

@ -1,7 +1,11 @@
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default')
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure,
distribution_arg_name text,
colocate_with text)
IS 'creates a distributed function';

View File

@ -1,7 +1,11 @@
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default')
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure,
distribution_arg_name text,
colocate_with text)
IS 'creates a distributed function';

View File

@ -18,7 +18,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)');
SELECT create_distributed_function('add(int,int)', '$1');
create_distributed_function
-----------------------------
@ -39,7 +39,7 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT create_distributed_function('dup(int)');
SELECT create_distributed_function('dup(int)', '$1');
create_distributed_function
-----------------------------
@ -52,6 +52,195 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
localhost | 57638 | t | (42,"42 is text")
(2 rows)
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int"
LINE 1: SELECT create_distributed_function('add_with_param_names(val...
^
CONTEXT: invalid type name "val1 int"
-- invalid distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- invalid distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
ERROR: invalid input syntax for integer: "1a"
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- NULL function
SELECT create_distributed_function(NULL);
ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses
HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL
-- NULL colocate_with
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
ERROR: colocate_with parameter should not be NULL
HINT: To use the default value, set colocate_with option to "default"
-- empty string distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
create_distributed_function
-----------------------------
(1 row)
-- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
create_distributed_function
-----------------------------
(1 row)
-- valid distribution with distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
create_distributed_function
-----------------------------
(1 row)
-- a function cannot be colocated with a table that is not "streaming" replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table_func_test (a int);
SET citus.replication_model TO "statement";
SELECT create_distributed_table('replicated_table_func_test', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming"
-- a function cannot be colocated with a different distribution argument type
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_2 (a bigint);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
ERROR: cannot colocate function "replicated_table_func_test_2" and table "add_with_param_names" because distribution column types don't match
-- colocate_with cannot be used without distribution key
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided
-- a function cannot be colocated with a local table
CREATE TABLE replicated_table_func_test_3 (a bigint);
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
ERROR: relation replicated_table_func_test_3 is not distributed
-- a function cannot be colocated with a reference table
SELECT create_reference_table('replicated_table_func_test_3');
create_reference_table
------------------------
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables.
-- finally, colocate the function with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_4 (a int);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
create_distributed_function
-----------------------------
(1 row)
-- show that the colocationIds are the same
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
table_and_function_colocated
------------------------------
t
(1 row)
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
create_distributed_function
-----------------------------
(1 row)
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
table_and_function_colocated
------------------------------
t
(1 row)
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('add_mixed_param_names(int, int)');
create_distributed_function
-----------------------------
(1 row)
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
?column? | ?column?
----------+----------
t | t
(1 row)
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;

View File

@ -1806,7 +1806,7 @@ step s2-public-schema:
step s2-distribute-function:
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
SELECT create_distributed_function('add(INT,INT)');
SELECT create_distributed_function('add(INT,INT)', '$1');
<waiting ...>
step s1-commit:
COMMIT;
@ -1921,7 +1921,7 @@ step s2-public-schema:
step s2-distribute-function:
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
SELECT create_distributed_function('add(INT,INT)');
SELECT create_distributed_function('add(INT,INT)', '$1');
create_distributed_function
@ -2045,7 +2045,7 @@ step s2-create-schema:
step s2-distribute-function:
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
SELECT create_distributed_function('add(INT,INT)');
SELECT create_distributed_function('add(INT,INT)', '$1');
create_distributed_function

View File

@ -117,7 +117,7 @@ step "s2-create-table-with-type"
step "s2-distribute-function"
{
CREATE OR REPLACE FUNCTION add (INT,INT) RETURNS INT AS $$ SELECT $1 + $2 $$ LANGUAGE SQL;
SELECT create_distributed_function('add(INT,INT)');
SELECT create_distributed_function('add(INT,INT)', '$1');
}
step "s2-begin"

View File

@ -15,7 +15,7 @@ CREATE FUNCTION add(integer, integer) RETURNS integer
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)');
SELECT create_distributed_function('add(int,int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
@ -30,12 +30,120 @@ CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT create_distributed_function('dup(int)');
SELECT create_distributed_function('dup(int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
-- invalid distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
-- invalid distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
-- NULL function
SELECT create_distributed_function(NULL);
-- NULL colocate_with
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
-- empty string distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '');
-- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
-- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
-- valid distribution with distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
-- a function cannot be colocated with a table that is not "streaming" replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table_func_test (a int);
SET citus.replication_model TO "statement";
SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
-- a function cannot be colocated with a different distribution argument type
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_2 (a bigint);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
-- colocate_with cannot be used without distribution key
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
-- a function cannot be colocated with a local table
CREATE TABLE replicated_table_func_test_3 (a bigint);
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
-- a function cannot be colocated with a reference table
SELECT create_reference_table('replicated_table_func_test_3');
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
-- finally, colocate the function with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_4 (a int);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
-- show that the colocationIds are the same
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('add_mixed_param_names(int, int)');
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$);
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);
SELECT run_command_on_workers($$DROP USER functionuser;$$);