Introduce create_distributed_function(regproc) UDF (#2961)

This PR aims to add the minimal set of changes required to start
distributing functions. You can use create_distributed_function(regproc)
UDF to distribute a function.

    SELECT create_distributed_function('add(int,int)');

The function definition should include the param types to properly
identify the correct function that we wish to distribute
pull/2975/head
Hanefi Onaldi 2019-09-13 23:27:46 +03:00 committed by GitHub
parent 012595da11
commit 8f2a3a0604
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 272 additions and 0 deletions

View File

@ -0,0 +1,142 @@
/*-------------------------------------------------------------------------
*
* function.c
* Commands for FUNCTION statements.
*
* We currently support replicating function definitions on the
* coordinator in all the worker nodes in the form of
*
* CREATE OR REPLACE FUNCTION ... queries.
*
* ALTER or DROP operations are not yet propagated.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/fmgrprotos.h"
/* forward declaration for helper functions*/
static const char * GetFunctionDDLCommand(Oid funcOid);
static void EnsureSequentialModeForFunctionDDL(void);
PG_FUNCTION_INFO_V1(create_distributed_function);
/*
* create_distributed_function gets a function or procedure name with their list of
* argument types in parantheses, then it creates a new distributed function.
*/
Datum
create_distributed_function(PG_FUNCTION_ARGS)
{
RegProcedure funcOid = PG_GETARG_OID(0);
const char *ddlCommand = NULL;
ObjectAddress functionAddress = { 0 };
/* if called on NULL input, error out */
if (funcOid == InvalidOid)
{
ereport(ERROR, (errmsg("create_distributed_function() requires a single "
"parameter that is a valid function or procedure name "
"followed by a list of parameters in parantheses"),
errhint("skip the parameters with OUT argtype as they are not "
"part of the signature in PostgreSQL")));
}
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
*/
EnsureSequentialModeForFunctionDDL();
EnsureDependenciesExistsOnAllNodes(&functionAddress);
ddlCommand = GetFunctionDDLCommand(funcOid);
SendCommandToWorkers(ALL_WORKERS, ddlCommand);
MarkObjectDistributed(&functionAddress);
PG_RETURN_VOID();
}
/*
* GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for
* the specified function.
*/
static const char *
GetFunctionDDLCommand(RegProcedure funcOid)
{
OverrideSearchPath *overridePath = NULL;
Datum sqlTextDatum = 0;
const char *sql = NULL;
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef,
ObjectIdGetDatum(funcOid));
/* revert back to original search_path */
PopOverrideSearchPath();
sql = TextDatumGetCString(sqlTextDatum);
return sql;
}
/*
* EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the beginnig.
*
* As functions are node scoped objects there exists only 1 instance of the function used by
* potentially multiple shards. To make sure all shards in the transaction can interact
* with the function the function needs to be visible on all connections used by the transaction,
* meaning we can only use 1 connection per node.
*/
static void
EnsureSequentialModeForFunctionDDL(void)
{
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create function because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating a distributed function, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"A distributed function is created. To make sure subsequent "
"commands see the type correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}

View File

@ -41,6 +41,8 @@ CREATE TABLE citus.pg_dist_object (
CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid)
);
#include "udfs/create_distributed_function/9.0-1.sql"
#include "udfs/citus_drop_trigger/9.0-1.sql"
#include "udfs/citus_prepare_pg_upgrade/9.0-1.sql"
#include "udfs/citus_finish_pg_upgrade/9.0-1.sql"

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
IS 'creates a distributed function';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure)
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure)
IS 'creates a distributed function';

View File

@ -0,0 +1,72 @@
SET citus.next_shard_id TO 20020000;
CREATE USER functionuser;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT run_command_on_workers($$CREATE USER functionuser;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
CREATE SCHEMA function_tests AUTHORIZATION functionuser;
SET search_path TO function_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution
SET citus.enable_ddl_propagation TO off;
CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT create_distributed_function('dup(int)');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+-------------------
localhost | 57637 | t | (42,"42 is text")
localhost | 57638 | t | (42,"42 is text")
(2 rows)
-- clear objects
SET client_min_messages TO fatal; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);
run_command_on_workers
---------------------------------
(localhost,57637,t,"DROP ROLE")
(localhost,57638,t,"DROP ROLE")
(2 rows)

View File

@ -281,3 +281,4 @@ test: ssl_by_default
# object distribution tests
# ---------
test: distributed_types
test: distributed_functions

View File

@ -0,0 +1,41 @@
SET citus.next_shard_id TO 20020000;
CREATE USER functionuser;
SELECT run_command_on_workers($$CREATE USER functionuser;$$);
CREATE SCHEMA function_tests AUTHORIZATION functionuser;
SET search_path TO function_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution
SET citus.enable_ddl_propagation TO off;
CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT create_distributed_function('dup(int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
-- clear objects
SET client_min_messages TO fatal; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$);
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);