Propagate CREATE OR REPLACE FUNCTION to workers for distributed functions (#3043)

DESCRIPTION: Propagate CREATE OR REPLACE FUNCTION

Distributed functions could be replaced, which should be propagated to the workers to keep the function in sync between all nodes.

Due to the complexity of deparsing the `CreateFunctionStmt` we actually produce the plan during the processing phase of our utilityhook. Since the changes have already been made in the catalog tables we can reuse `pg_get_functiondef` to get us the generated `CREATE OR REPLACE` sql.
pull/3050/head^2
Nils Dijk 2019-09-30 12:41:17 +02:00 committed by GitHub
parent 82ec918b29
commit 473cbc0115
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 200 additions and 0 deletions

View File

@ -66,6 +66,7 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *colocationId);
static void EnsureSequentialModeForFunctionDDL(void);
static void TriggerSyncMetadataToPrimaryNodes(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static ObjectAddress * FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs,
@ -631,6 +632,57 @@ TriggerSyncMetadataToPrimaryNodes(void)
}
/*
* ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION
* statement. We only propagate replace's of distributed functions to keep the function on
* the workers in sync with the one on the coordinator.
*/
static bool
ShouldPropagateCreateFunction(CreateFunctionStmt *stmt)
{
const ObjectAddress *address = NULL;
if (creating_extension)
{
/*
* extensions should be created separately on the workers, functions cascading
* from an extension should therefore not be propagated.
*/
return false;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
if (!stmt->replace)
{
/*
* Since we only care for a replace of distributed functions if the statement is
* not a replace we are going to ignore.
*/
return false;
}
/*
* Even though its a replace we should accept an non-existing function, it will just
* not be distributed
*/
address = GetObjectAddressFromParseTree((Node *) stmt, true);
if (!IsObjectDistributed(address))
{
/* do not propagate alter function for non-distributed functions */
return false;
}
return true;
}
/*
* ShouldPropagateAlterFunction returns, based on the address of a function, if alter
* statements targeting the function should be propagated.
@ -665,6 +717,102 @@ ShouldPropagateAlterFunction(const ObjectAddress *address)
}
/*
* PlanCreateFunctionStmt is called during the planning phase for CREATE [OR REPLACE]
* FUNCTION. We primarily care for the replace variant of this statement to keep
* distributed functions in sync. We bail via a check on ShouldPropagateCreateFunction
* which checks for the OR REPLACE modifier.
*
* Since we use pg_get_functiondef to get the ddl command we actually do not do any
* planning here, instead we defer the plan creation to the processing step.
*
* Instead we do our basic housekeeping where we make sure we are on the coordinator and
* can propagate the function in sequential mode.
*/
List *
PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString)
{
if (!ShouldPropagateCreateFunction(stmt))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialModeForFunctionDDL();
/*
* ddl jobs will be generated during the Processing phase as we need the function to
* be updated in the catalog to get its sql representation
*/
return NIL;
}
/*
* ProcessCreateFunctionStmt actually creates the plan we need to execute for function
* propagation. This is the downside of using pg_get_functiondef to get the sql statement.
*
* Besides creating the plan we also make sure all (new) dependencies of the function are
* created on all nodes.
*/
List *
ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString)
{
const ObjectAddress *address = NULL;
const char *sql = NULL;
List *commands = NIL;
if (!ShouldPropagateCreateFunction(stmt))
{
return NIL;
}
address = GetObjectAddressFromParseTree((Node *) stmt, false);
EnsureDependenciesExistsOnAllNodes(address);
sql = GetFunctionDDLCommand(address->objectId);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* CreateFunctionStmtObjectAddress returns the ObjectAddress for the subject of the
* CREATE [OR REPLACE] FUNCTION statement. If missing_ok is false it will error with the
* normal postgres error for unfound functions.
*/
const ObjectAddress *
CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, bool missing_ok)
{
ObjectType objectType = OBJECT_FUNCTION;
ObjectWithArgs *objectWithArgs = NULL;
ListCell *parameterCell = NULL;
#if PG_VERSION_NUM > 110000
if (stmt->is_procedure)
{
objectType = OBJECT_PROCEDURE;
}
#endif
objectWithArgs = makeNode(ObjectWithArgs);
objectWithArgs->objname = stmt->funcname;
foreach(parameterCell, stmt->parameters)
{
FunctionParameter *funcParam = castNode(FunctionParameter, lfirst(parameterCell));
objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType);
}
return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok);
}
/*
* PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we
* plan the jobs to be executed on the workers for functions that have been distributed in

View File

@ -554,6 +554,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
queryString);
}
if (IsA(parsetree, CreateFunctionStmt))
{
ddlJobs = PlanCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree),
queryString);
}
/*
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
@ -706,6 +712,13 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{
ProcessAlterEnumStmt(castNode(AlterEnumStmt, parsetree), queryString);
}
if (IsA(parsetree, CreateFunctionStmt))
{
Assert(ddlJobs == NIL); /* jobs should not have been set before */
ddlJobs = ProcessCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree),
queryString);
}
}
/*

View File

@ -82,6 +82,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
missing_ok);
}
case T_CreateFunctionStmt:
{
return CreateFunctionStmtObjectAddress(
castNode(CreateFunctionStmt, parseTree), missing_ok);
}
default:
{
/*

View File

@ -51,6 +51,11 @@ extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId);
/* function.c - forward declarations */
extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString);
extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const
char *queryString);
extern const ObjectAddress * CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt,
bool missing_ok);
extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString);
extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt,
bool missing_ok);

View File

@ -288,6 +288,25 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B
(2 rows)
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 6
localhost | 57638 | t | 6
(2 rows)
DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;

View File

@ -149,6 +149,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;