mirror of https://github.com/citusdata/citus.git
Disallow distributed functions for functions depending on an extension (#3049)
DESCRIPTION: Disallow distributed functions for functions depending on an extension Functions depending on an extension cannot (yet) be distributed by citus. If we would allow this it would cause issues with our dependency following mechanism as we stop following objects depending on an extension. By not allowing functions to be distributed when they depend on an extension as well as not allowing to make distributed functions depend on an extension we won't break the ability to add new nodes. Allowing functions depending on extensions to be distributed at the moment could cause problems in that area.pull/3052/head
parent
4d991f281c
commit
01b26cf91a
|
@ -72,6 +72,7 @@ static ObjectAddress * FunctionToObjectAddress(ObjectType objectType,
|
|||
ObjectWithArgs *objectWithArgs,
|
||||
bool missing_ok);
|
||||
static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt);
|
||||
static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(create_distributed_function);
|
||||
|
@ -145,6 +146,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
|||
|
||||
|
||||
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
|
||||
ErrorIfFunctionDependsOnExtension(&functionAddress);
|
||||
|
||||
/*
|
||||
* when we allow propagation within a transaction block we should make sure to only
|
||||
|
@ -1061,6 +1063,78 @@ PlanDropFunctionStmt(DropStmt *stmt, const char *queryString)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PlanAlterFunctionDependsStmt is called during the planning phase of an
|
||||
* ALTER FUNCION ... DEPENDS ON EXTENSION ... statement. Since functions depending on
|
||||
* extensions are assumed to be Owned by an extension we assume the extension to keep the
|
||||
* function in sync.
|
||||
*
|
||||
* If we would allow users to create a dependency between a distributed function and an
|
||||
* extension our pruning logic for which objects to distribute as dependencies of other
|
||||
* objects will change significantly which could cause issues adding new workers. Hence we
|
||||
* don't allow this dependency to be created.
|
||||
*/
|
||||
List *
|
||||
PlanAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt, const char *queryString)
|
||||
{
|
||||
const ObjectAddress *address = NULL;
|
||||
const char *functionName = NULL;
|
||||
|
||||
AssertIsFunctionOrProcedure(stmt->objectType);
|
||||
|
||||
if (creating_extension)
|
||||
{
|
||||
/*
|
||||
* extensions should be created separately on the workers, types cascading from an
|
||||
* extension should therefor not be propagated here.
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (!EnableDependencyCreation)
|
||||
{
|
||||
/*
|
||||
* we are configured to disable object propagation, should not propagate anything
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
address = GetObjectAddressFromParseTree((Node *) stmt, true);
|
||||
if (!IsObjectDistributed(address))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Distributed objects should not start depending on an extension, this will break
|
||||
* the dependency resolving mechanism we use to replicate distributed objects to new
|
||||
* workers
|
||||
*/
|
||||
|
||||
functionName = getObjectIdentity(address);
|
||||
ereport(ERROR, (errmsg("distrtibuted functions are not allowed to depend on an "
|
||||
"extension"),
|
||||
errdetail("Function \"%s\" is already distributed. Functions from "
|
||||
"extensions are expected to be created on the workers by "
|
||||
"the extension they depend on.", functionName)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AlterFunctionDependsStmtObjectAddress resolves the ObjectAddress of the function that
|
||||
* is the subject of an ALTER FUNCTION ... DEPENS ON EXTENSION ... statement. If
|
||||
* missing_ok is set to false the lookup will raise an error.
|
||||
*/
|
||||
const ObjectAddress *
|
||||
AlterFunctionDependsStmtObjectAddress(AlterObjectDependsStmt *stmt, bool missing_ok)
|
||||
{
|
||||
AssertIsFunctionOrProcedure(stmt->objectType);
|
||||
|
||||
return FunctionToObjectAddress(stmt->objectType,
|
||||
castNode(ObjectWithArgs, stmt->object), missing_ok);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProcessAlterFunctionSchemaStmt is executed after the change has been applied locally,
|
||||
* we can now use the new dependencies of the function to ensure all its dependencies
|
||||
|
@ -1246,3 +1320,28 @@ ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfFunctionDependsOnExtension functions depending on extensions should raise an
|
||||
* error informing the user why they can't be distributed.
|
||||
*/
|
||||
static void
|
||||
ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress)
|
||||
{
|
||||
/* captures the extension address during lookup */
|
||||
ObjectAddress extensionAddress = { 0 };
|
||||
|
||||
if (IsObjectAddressOwnedByExtension(functionAddress, &extensionAddress))
|
||||
{
|
||||
char *functionName = getObjectIdentity(functionAddress);
|
||||
char *extensionName = getObjectIdentity(&extensionAddress);
|
||||
ereport(ERROR, (errmsg("unable to create a distributed function from functions "
|
||||
"owned by an extension"),
|
||||
errdetail("Function \"%s\" has a dependency on extension \"%s\". "
|
||||
"Functions depending on an extension cannot be "
|
||||
"distributed. Create the function by creating the "
|
||||
"extension on the workers.", functionName,
|
||||
extensionName)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,8 @@ static char * CurrentSearchPath(void);
|
|||
static void PostProcessUtility(Node *parsetree);
|
||||
static List * PlanRenameAttributeStmt(RenameStmt *stmt, const char *queryString);
|
||||
static List * PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString);
|
||||
static List * PlanAlterObjectDependsStmt(AlterObjectDependsStmt *stmt,
|
||||
const char *queryString);
|
||||
|
||||
static void ExecuteNodeBaseDDLCommands(List *taskList);
|
||||
|
||||
|
@ -560,6 +562,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
queryString);
|
||||
}
|
||||
|
||||
if (IsA(parsetree, AlterObjectDependsStmt))
|
||||
{
|
||||
ddlJobs = PlanAlterObjectDependsStmt(
|
||||
castNode(AlterObjectDependsStmt, parsetree), queryString);
|
||||
}
|
||||
|
||||
/*
|
||||
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
|
||||
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
|
||||
|
@ -852,6 +860,33 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PlanAlterObjectDependsStmt gets called during the planning phase for
|
||||
* ALTER ... DEPENDS ON EXTENSION ... statements. Based on the object type we call out to
|
||||
* a specialized implementation. If no implementation is available we do nothing, eg. we
|
||||
* allow the local node to execute.
|
||||
*/
|
||||
static List *
|
||||
PlanAlterObjectDependsStmt(AlterObjectDependsStmt *stmt, const char *queryString)
|
||||
{
|
||||
switch (stmt->objectType)
|
||||
{
|
||||
#if PG_VERSION_NUM > 110000
|
||||
case OBJECT_PROCEDURE:
|
||||
#endif
|
||||
case OBJECT_FUNCTION:
|
||||
{
|
||||
return PlanAlterFunctionDependsStmt(stmt, queryString);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
|
||||
* action, including metadata sync if needed. If the multi shard commit protocol is
|
||||
|
|
|
@ -24,6 +24,8 @@ static const ObjectAddress * RenameAttributeStmtObjectAddress(RenameStmt *stmt,
|
|||
bool missing_ok);
|
||||
static const ObjectAddress * AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt,
|
||||
bool missing_ok);
|
||||
static const ObjectAddress * AlterObjectDependsStmtObjectAddress(
|
||||
AlterObjectDependsStmt *stmt, bool missing_ok);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -88,6 +90,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
|
|||
castNode(CreateFunctionStmt, parseTree), missing_ok);
|
||||
}
|
||||
|
||||
case T_AlterObjectDependsStmt:
|
||||
{
|
||||
return AlterObjectDependsStmtObjectAddress(
|
||||
castNode(AlterObjectDependsStmt, parseTree), missing_ok);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/*
|
||||
|
@ -225,3 +233,35 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AlterObjectDependsStmtObjectAddress resolves the ObjectAddress for the object targeted
|
||||
* by the AlterObjectDependStmt. This is done by dispatching the call to the object
|
||||
* specific implementation based on the ObjectType captured in the original statement. If
|
||||
* a specific implementation is not present an error will be raised. This is a developer
|
||||
* error since this function should only be reachable by calls of supported types.
|
||||
*
|
||||
* If missing_ok is set to fails the object specific implementation is supposed to raise
|
||||
* an error explaining the user the object is not existing.
|
||||
*/
|
||||
static const ObjectAddress *
|
||||
AlterObjectDependsStmtObjectAddress(AlterObjectDependsStmt *stmt, bool missing_ok)
|
||||
{
|
||||
switch (stmt->objectType)
|
||||
{
|
||||
#if PG_VERSION_NUM > 110000
|
||||
case OBJECT_PROCEDURE:
|
||||
#endif
|
||||
case OBJECT_FUNCTION:
|
||||
{
|
||||
return AlterFunctionDependsStmtObjectAddress(stmt, missing_ok);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unsupported alter depends on extension statement to "
|
||||
"get object address for")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,9 +60,6 @@ static bool FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depe
|
|||
static void ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend);
|
||||
static List * ExpandCitusSupportedTypes(void *context, const ObjectAddress *target);
|
||||
|
||||
/* forward declaration of support functions to decide what to follow */
|
||||
static bool IsObjectAddressOwnedByExtension(const ObjectAddress *target);
|
||||
|
||||
|
||||
/*
|
||||
* GetDependenciesForObject returns a list of ObjectAddesses to be created in order
|
||||
|
@ -403,9 +400,13 @@ SupportedDependencyByCitus(const ObjectAddress *address)
|
|||
* IsObjectAddressOwnedByExtension returns whether or not the object is owned by an
|
||||
* extension. It is assumed that an object having a dependency on an extension is created
|
||||
* by that extension and therefore owned by that extension.
|
||||
*
|
||||
* If extensionAddress is not set to a NULL pointer the function will write the extension
|
||||
* address this function depends on into this location.
|
||||
*/
|
||||
static bool
|
||||
IsObjectAddressOwnedByExtension(const ObjectAddress *target)
|
||||
bool
|
||||
IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||
ObjectAddress *extensionAddress)
|
||||
{
|
||||
Relation depRel = NULL;
|
||||
ScanKeyData key[2];
|
||||
|
@ -428,6 +429,11 @@ IsObjectAddressOwnedByExtension(const ObjectAddress *target)
|
|||
if (pg_depend->deptype == DEPENDENCY_EXTENSION)
|
||||
{
|
||||
result = true;
|
||||
if (extensionAddress != NULL)
|
||||
{
|
||||
ObjectAddressSubSet(*extensionAddress, pg_depend->refclassid,
|
||||
pg_depend->refobjid, pg_depend->refobjsubid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -489,7 +495,7 @@ FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
* Objects owned by an extension are assumed to be created on the workers by creating
|
||||
* the extension in the cluster
|
||||
*/
|
||||
if (IsObjectAddressOwnedByExtension(&address))
|
||||
if (IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -540,7 +546,7 @@ FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
* Objects owned by an extension are assumed to be created on the workers by creating
|
||||
* the extension in the cluster
|
||||
*/
|
||||
if (IsObjectAddressOwnedByExtension(&address))
|
||||
if (IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -72,6 +72,10 @@ extern const ObjectAddress * AlterFunctionSchemaStmtObjectAddress(
|
|||
extern void ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt,
|
||||
const char *queryString);
|
||||
extern List * PlanDropFunctionStmt(DropStmt *stmt, const char *queryString);
|
||||
extern List * PlanAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt,
|
||||
const char *queryString);
|
||||
extern const ObjectAddress * AlterFunctionDependsStmtObjectAddress(
|
||||
AlterObjectDependsStmt *stmt, bool missing_ok);
|
||||
|
||||
|
||||
/* grant.c - forward declarations */
|
||||
|
|
|
@ -21,6 +21,8 @@ extern bool IsObjectDistributed(const ObjectAddress *address);
|
|||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||
ObjectAddress *extensionAddress);
|
||||
|
||||
extern List * GetDistributedObjectAddressList(void);
|
||||
|
||||
|
|
|
@ -307,6 +307,14 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
|
|||
localhost | 57638 | t | 6
|
||||
(2 rows)
|
||||
|
||||
-- distributed functions should not be allowed to depend on an extension, also functions
|
||||
-- that depend on an extension should not be allowed to be distributed.
|
||||
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;
|
||||
ERROR: distrtibuted functions are not allowed to depend on an extension
|
||||
DETAIL: Function "function_tests.add(integer,integer)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on.
|
||||
SELECT create_distributed_function('pg_catalog.citus_drop_trigger()');
|
||||
ERROR: unable to create a distributed function from functions owned by an extension
|
||||
DETAIL: Function "pg_catalog.citus_drop_trigger()" has a dependency on extension "citus". Functions depending on an extension cannot be distributed. Create the function by creating the extension on the workers.
|
||||
DROP FUNCTION add(int,int);
|
||||
-- call should fail as function should have been dropped
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
||||
|
|
|
@ -158,6 +158,11 @@ AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us
|
|||
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
||||
|
||||
-- distributed functions should not be allowed to depend on an extension, also functions
|
||||
-- that depend on an extension should not be allowed to be distributed.
|
||||
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;
|
||||
SELECT create_distributed_function('pg_catalog.citus_drop_trigger()');
|
||||
|
||||
DROP FUNCTION add(int,int);
|
||||
-- call should fail as function should have been dropped
|
||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
||||
|
|
Loading…
Reference in New Issue