From 8f2a3a06046cfe711b29d1388ff401af1ead9d77 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Fri, 13 Sep 2019 23:27:46 +0300 Subject: [PATCH] Introduce create_distributed_function(regproc) UDF (#2961) This PR aims to add the minimal set of changes required to start distributing functions. You can use create_distributed_function(regproc) UDF to distribute a function. SELECT create_distributed_function('add(int,int)'); The function definition should include the param types to properly identify the correct function that we wish to distribute --- src/backend/distributed/commands/function.c | 142 ++++++++++++++++++ .../distributed/sql/citus--8.3-1--9.0-1.sql | 2 + .../create_distributed_function/9.0-1.sql | 7 + .../create_distributed_function/latest.sql | 7 + .../expected/distributed_functions.out | 72 +++++++++ src/test/regress/multi_schedule | 1 + .../regress/sql/distributed_functions.sql | 41 +++++ 7 files changed, 272 insertions(+) create mode 100644 src/backend/distributed/commands/function.c create mode 100644 src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/create_distributed_function/latest.sql create mode 100644 src/test/regress/expected/distributed_functions.out create mode 100644 src/test/regress/sql/distributed_functions.sql diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c new file mode 100644 index 000000000..a816f8040 --- /dev/null +++ b/src/backend/distributed/commands/function.c @@ -0,0 +1,142 @@ +/*------------------------------------------------------------------------- + * + * function.c + * Commands for FUNCTION statements. + * + * We currently support replicating function definitions on the + * coordinator in all the worker nodes in the form of + * + * CREATE OR REPLACE FUNCTION ... queries. + * + * ALTER or DROP operations are not yet propagated. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "catalog/namespace.h" +#include "catalog/pg_proc.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" +#include "distributed/worker_transaction.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" + +/* forward declaration for helper functions*/ +static const char * GetFunctionDDLCommand(Oid funcOid); +static void EnsureSequentialModeForFunctionDDL(void); + +PG_FUNCTION_INFO_V1(create_distributed_function); + +/* + * create_distributed_function gets a function or procedure name with their list of + * argument types in parantheses, then it creates a new distributed function. + */ +Datum +create_distributed_function(PG_FUNCTION_ARGS) +{ + RegProcedure funcOid = PG_GETARG_OID(0); + const char *ddlCommand = NULL; + ObjectAddress functionAddress = { 0 }; + + /* if called on NULL input, error out */ + if (funcOid == InvalidOid) + { + ereport(ERROR, (errmsg("create_distributed_function() requires a single " + "parameter that is a valid function or procedure name " + "followed by a list of parameters in parantheses"), + errhint("skip the parameters with OUT argtype as they are not " + "part of the signature in PostgreSQL"))); + } + + ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid); + + /* + * when we allow propagation within a transaction block we should make sure to only + * allow this in sequential mode + */ + EnsureSequentialModeForFunctionDDL(); + + EnsureDependenciesExistsOnAllNodes(&functionAddress); + + ddlCommand = GetFunctionDDLCommand(funcOid); + SendCommandToWorkers(ALL_WORKERS, ddlCommand); + + MarkObjectDistributed(&functionAddress); + + PG_RETURN_VOID(); +} + + +/* + * GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for + * the specified function. + */ +static const char * +GetFunctionDDLCommand(RegProcedure funcOid) +{ + OverrideSearchPath *overridePath = NULL; + Datum sqlTextDatum = 0; + const char *sql = NULL; + + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + PushOverrideSearchPath(overridePath); + + sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, + ObjectIdGetDatum(funcOid)); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + sql = TextDatumGetCString(sqlTextDatum); + return sql; +} + + +/* + * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the beginnig. + * + * As functions are node scoped objects there exists only 1 instance of the function used by + * potentially multiple shards. To make sure all shards in the transaction can interact + * with the function the function needs to be visible on all connections used by the transaction, + * meaning we can only use 1 connection per node. + */ +static void +EnsureSequentialModeForFunctionDDL(void) +{ + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot create function because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail("When creating a distributed function, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail( + "A distributed function is created. To make sure subsequent " + "commands see the type correctly we need to make sure to " + "use only one connection for all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} diff --git a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql index 79a5d40cb..417be0adb 100644 --- a/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql +++ b/src/backend/distributed/sql/citus--8.3-1--9.0-1.sql @@ -41,6 +41,8 @@ CREATE TABLE citus.pg_dist_object ( CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid) ); +#include "udfs/create_distributed_function/9.0-1.sql" + #include "udfs/citus_drop_trigger/9.0-1.sql" #include "udfs/citus_prepare_pg_upgrade/9.0-1.sql" #include "udfs/citus_finish_pg_upgrade/9.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql b/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql new file mode 100644 index 000000000..8b7cc457b --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_distributed_function/9.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure) + RETURNS void + LANGUAGE C CALLED ON NULL INPUT + AS 'MODULE_PATHNAME', $$create_distributed_function$$; + +COMMENT ON FUNCTION create_distributed_function(function_name regprocedure) + IS 'creates a distributed function'; diff --git a/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql b/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql new file mode 100644 index 000000000..8b7cc457b --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_distributed_function/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure) + RETURNS void + LANGUAGE C CALLED ON NULL INPUT + AS 'MODULE_PATHNAME', $$create_distributed_function$$; + +COMMENT ON FUNCTION create_distributed_function(function_name regprocedure) + IS 'creates a distributed function'; diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out new file mode 100644 index 000000000..ad92d891e --- /dev/null +++ b/src/test/regress/expected/distributed_functions.out @@ -0,0 +1,72 @@ +SET citus.next_shard_id TO 20020000; +CREATE USER functionuser; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +SELECT run_command_on_workers($$CREATE USER functionuser;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +CREATE SCHEMA function_tests AUTHORIZATION functionuser; +SET search_path TO function_tests; +SET citus.shard_count TO 4; +-- Create and distribute a simple function +CREATE FUNCTION add(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT create_distributed_function('add(int,int)'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + +-- Test some combination of functions without ddl propagation +-- This will prevent the workers from having those types created. They are +-- created just-in-time on function distribution +SET citus.enable_ddl_propagation TO off; +CREATE TYPE dup_result AS (f1 int, f2 text); +CREATE FUNCTION dup(int) RETURNS dup_result + AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ + LANGUAGE SQL; +SELECT create_distributed_function('dup(int)'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+------------------- + localhost | 57637 | t | (42,"42 is text") + localhost | 57638 | t | (42,"42 is text") +(2 rows) + +-- clear objects +SET client_min_messages TO fatal; -- suppress cascading objects dropping +DROP SCHEMA function_tests CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"DROP SCHEMA") + (localhost,57638,t,"DROP SCHEMA") +(2 rows) + +DROP USER functionuser; +SELECT run_command_on_workers($$DROP USER functionuser;$$); + run_command_on_workers +--------------------------------- + (localhost,57637,t,"DROP ROLE") + (localhost,57638,t,"DROP ROLE") +(2 rows) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8a93290eb..aa55096cb 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -281,3 +281,4 @@ test: ssl_by_default # object distribution tests # --------- test: distributed_types +test: distributed_functions diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql new file mode 100644 index 000000000..84bd0fe19 --- /dev/null +++ b/src/test/regress/sql/distributed_functions.sql @@ -0,0 +1,41 @@ +SET citus.next_shard_id TO 20020000; + +CREATE USER functionuser; +SELECT run_command_on_workers($$CREATE USER functionuser;$$); + +CREATE SCHEMA function_tests AUTHORIZATION functionuser; + +SET search_path TO function_tests; +SET citus.shard_count TO 4; + + +-- Create and distribute a simple function +CREATE FUNCTION add(integer, integer) RETURNS integer + AS 'select $1 + $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT create_distributed_function('add(int,int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + + +-- Test some combination of functions without ddl propagation +-- This will prevent the workers from having those types created. They are +-- created just-in-time on function distribution +SET citus.enable_ddl_propagation TO off; + +CREATE TYPE dup_result AS (f1 int, f2 text); + +CREATE FUNCTION dup(int) RETURNS dup_result + AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ + LANGUAGE SQL; + +SELECT create_distributed_function('dup(int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + +-- clear objects +SET client_min_messages TO fatal; -- suppress cascading objects dropping +DROP SCHEMA function_tests CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); +DROP USER functionuser; +SELECT run_command_on_workers($$DROP USER functionuser;$$); \ No newline at end of file