Hookup function/procedure deparsing to our utility hook (#3041)

DESCRIPTION: Propagate ALTER FUNCTION statements for distributed functions

Using the implemented deparser for function statements to propagate changes to both functions and procedures that are previously distributed.
pull/3046/head^2
Nils Dijk 2019-09-27 22:06:49 +02:00 committed by GitHub
parent 363409a0c2
commit 9c2c50d875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1293 additions and 511 deletions

View File

@ -26,22 +26,26 @@
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "distributed/commands.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "distributed/colocation_utils.h"
#include "distributed/master_protocol.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_sync.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h"
#include "parser/parse_coerce.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgrprotos.h"
#include "utils/fmgroids.h"
#include "utils/fmgrprotos.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@ -62,10 +66,23 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *colocationId);
static void EnsureSequentialModeForFunctionDDL(void);
static void TriggerSyncMetadataToPrimaryNodes(void);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static ObjectAddress * FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs,
bool missing_ok);
static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt);
PG_FUNCTION_INFO_V1(create_distributed_function);
#if PG_VERSION_NUM > 110000
#define AssertIsFunctionOrProcedure(objtype) \
Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE)
#else
#define AssertIsFunctionOrProcedure(objtype) \
Assert(objtype == OBJECT_FUNCTION)
#endif
/*
* create_distributed_function gets a function or procedure name with their list of
@ -612,3 +629,472 @@ TriggerSyncMetadataToPrimaryNodes(void)
TriggerMetadataSync(MyDatabaseId);
}
}
/*
* ShouldPropagateAlterFunction returns, based on the address of a function, if alter
* statements targeting the function should be propagated.
*/
static bool
ShouldPropagateAlterFunction(const ObjectAddress *address)
{
if (creating_extension)
{
/*
* extensions should be created separately on the workers, functions cascading
* from an extension should therefore not be propagated.
*/
return false;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
if (!IsObjectDistributed(address))
{
/* do not propagate alter function for non-distributed functions */
return false;
}
return true;
}
/*
* PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we
* plan the jobs to be executed on the workers for functions that have been distributed in
* the cluster.
*/
List *
PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString)
{
const char *sql = NULL;
const ObjectAddress *address = NULL;
List *commands = NIL;
/* AlterFunctionStmt->objtype has only been added since pg11 */
#if PG_VERSION_NUM > 110000
AssertIsFunctionOrProcedure(stmt->objtype);
#endif
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
{
return NIL;
}
EnsureCoordinator();
ErrorIfUnsupportedAlterFunctionStmt(stmt);
EnsureSequentialModeForFunctionDDL();
QualifyTreeNode((Node *) stmt);
sql = DeparseTreeNode((Node *) stmt);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* PlanRenameFunctionStmt is called when the user is renaming a function. The invocation
* happens before the statement is applied locally.
*
* As the function already exists we have access to the ObjectAddress, this is used to
* check if it is distributed. If so the rename is executed on all the workers to keep the
* types in sync across the cluster.
*/
List *
PlanRenameFunctionStmt(RenameStmt *stmt, const char *queryString)
{
const char *sql = NULL;
const ObjectAddress *address = NULL;
List *commands = NIL;
AssertIsFunctionOrProcedure(stmt->renameType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialModeForFunctionDDL();
QualifyTreeNode((Node *) stmt);
sql = DeparseTreeNode((Node *) stmt);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* PlanAlterFunctionSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers.
*/
List *
PlanAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
{
const char *sql = NULL;
const ObjectAddress *address = NULL;
List *commands = NIL;
AssertIsFunctionOrProcedure(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialModeForFunctionDDL();
QualifyTreeNode((Node *) stmt);
sql = DeparseTreeNode((Node *) stmt);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* PlanAlterTypeOwnerStmt is called for change of owner ship of functions before the owner
* ship is changed on the local instance.
*
* If the function for which the owner is changed is distributed we execute the change on
* all the workers to keep the type in sync across the cluster.
*/
List *
PlanAlterFunctionOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
{
const ObjectAddress *address = NULL;
const char *sql = NULL;
List *commands = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialModeForFunctionDDL();
QualifyTreeNode((Node *) stmt);
sql = DeparseTreeNode((Node *) stmt);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* PlanDropFunctionStmt gets called during the planning phase of a DROP FUNCTION statement
* and returns a list of DDLJob's that will drop any distributed functions from the
* workers.
*
* The DropStmt could have multiple objects to drop, the list of objects will be filtered
* to only keep the distributed functions for deletion on the workers. Non-distributed
* functions will still be dropped locally but not on the workers.
*/
List *
PlanDropFunctionStmt(DropStmt *stmt, const char *queryString)
{
List *deletingObjectWithArgsList = stmt->objects;
List *distributedObjectWithArgsList = NIL;
List *distributedFunctionAddresses = NIL;
ListCell *addressCell = NULL;
const char *dropStmtSql = NULL;
List *commands = NULL;
ListCell *objectWithArgsListCell = NULL;
DropStmt *stmtCopy = NULL;
AssertIsFunctionOrProcedure(stmt->removeType);
if (creating_extension)
{
/*
* extensions should be created separately on the workers, types cascading from an
* extension should therefor not be propagated here.
*/
return NIL;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return NIL;
}
/*
* Our statements need to be fully qualified so we can drop them from the right schema
* on the workers
*/
QualifyTreeNode((Node *) stmt);
/*
* iterate over all functions to be dropped and filter to keep only distributed
* functions.
*/
foreach(objectWithArgsListCell, deletingObjectWithArgsList)
{
ObjectWithArgs *func = NULL;
ObjectAddress *address = NULL;
func = castNode(ObjectWithArgs, lfirst(objectWithArgsListCell));
address = FunctionToObjectAddress(stmt->removeType, func, stmt->missing_ok);
if (!IsObjectDistributed(address))
{
continue;
}
/* collect information for all distributed functions */
distributedFunctionAddresses = lappend(distributedFunctionAddresses, address);
distributedObjectWithArgsList = lappend(distributedObjectWithArgsList, func);
}
if (list_length(distributedObjectWithArgsList) <= 0)
{
/* no distributed functions to drop */
return NIL;
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
*/
EnsureCoordinator();
EnsureSequentialModeForFunctionDDL();
/* remove the entries for the distributed objects on dropping */
foreach(addressCell, distributedFunctionAddresses)
{
ObjectAddress *address = (ObjectAddress *) lfirst(addressCell);
UnmarkObjectDistributed(address);
}
/*
* Swap the list of objects before deparsing and restore the old list after. This
* ensures we only have distributed functions in the deparsed drop statement.
*/
stmtCopy = copyObject(stmt);
stmtCopy->objects = distributedObjectWithArgsList;
dropStmtSql = DeparseTreeNode((Node *) stmtCopy);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
}
/*
* ProcessAlterFunctionSchemaStmt is executed after the change has been applied locally,
* we can now use the new dependencies of the function to ensure all its dependencies
* exist on the workers before we apply the commands remotely.
*/
void
ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
{
const ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
{
return;
}
/* dependencies have changed (schema) lets ensure they exist */
EnsureDependenciesExistsOnAllNodes(address);
}
/*
* AlterFunctionStmtObjectAddress returns the ObjectAddress of the subject in the
* AlterFunctionStmt. If missing_ok is set to false an error will be raised if postgres
* was unable to find the function/procedure that was the target of the statement.
*/
const ObjectAddress *
AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt, bool missing_ok)
{
ObjectType objectType = OBJECT_FUNCTION;
#if PG_VERSION_NUM > 110000
objectType = stmt->objtype;
#endif
return FunctionToObjectAddress(objectType, stmt->func, missing_ok);
}
/*
* RenameFunctionStmtObjectAddress returns the ObjectAddress of the function that is the
* subject of the RenameStmt. Errors if missing_ok is false.
*/
const ObjectAddress *
RenameFunctionStmtObjectAddress(RenameStmt *stmt, bool missing_ok)
{
return FunctionToObjectAddress(stmt->renameType,
castNode(ObjectWithArgs, stmt->object), missing_ok);
}
/*
* AlterFunctionOwnerObjectAddress returns the ObjectAddress of the function that is the
* subject of the AlterOwnerStmt. Errors if missing_ok is false.
*/
const ObjectAddress *
AlterFunctionOwnerObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
{
return FunctionToObjectAddress(stmt->objectType,
castNode(ObjectWithArgs, stmt->object), missing_ok);
}
/*
* AlterFunctionSchemaStmtObjectAddress returns the ObjectAddress of the function that is
* the subject of the AlterObjectSchemaStmt. Errors if missing_ok is false.
*
* This could be called both before or after it has been applied locally. It will look in
* the old schema first, if the function cannot be found in that schema it will look in
* the new schema. Errors if missing_ok is false and the type cannot be found in either of
* the schemas.
*/
const ObjectAddress *
AlterFunctionSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok)
{
ObjectWithArgs *objectWithArgs = NULL;
Oid funcOid = InvalidOid;
List *names = NIL;
ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
objectWithArgs = castNode(ObjectWithArgs, stmt->object);
funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs, true);
names = objectWithArgs->objname;
if (funcOid == InvalidOid)
{
/*
* couldn't find the function, might have already been moved to the new schema, we
* construct a new objname that uses the new schema to search in.
*/
/* the name of the function is the last in the list of names */
Value *funcNameStr = lfirst(list_tail(names));
List *newNames = list_make2(makeString(stmt->newschema), funcNameStr);
/*
* we don't error here either, as the error would be not a good user facing
* error if the type didn't exist in the first place.
*/
objectWithArgs->objname = newNames;
funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs, true);
objectWithArgs->objname = names; /* restore the original names */
/*
* if the function is still invalid we couldn't find the function, cause postgres
* to error by preforming a lookup once more. Since we know the
*/
if (!missing_ok && funcOid == InvalidOid)
{
/*
* this will most probably throw an error, unless for some reason the function
* has just been created (if possible at all). For safety we assign the
* funcOid.
*/
funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs,
missing_ok);
}
}
address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, ProcedureRelationId, funcOid);
return address;
}
/*
* FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on
* its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to
* false an error will be raised by postgres explaining the Function/Procedure could not
* be found.
*/
static ObjectAddress *
FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs,
bool missing_ok)
{
Oid funcOid = InvalidOid;
ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(objectType);
funcOid = LookupFuncWithArgsCompat(objectType, objectWithArgs, missing_ok);
address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, ProcedureRelationId, funcOid);
return address;
}
/*
* ErrorIfUnsupportedAlterFunctionStmt raises an error if the AlterFunctionStmt contains a
* construct that is not supported to be altered on a distributed function. It is assumed
* the statement passed in is already tested to be targeting a distributed function, and
* will only execute the checks to error on unsupported constructs.
*
* Unsupported Constructs:
* - ALTER FUNCTION ... SET ... FROM CURRENT
*/
static void
ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt)
{
ListCell *actionCell = NULL;
foreach(actionCell, stmt->actions)
{
DefElem *action = castNode(DefElem, lfirst(actionCell));
if (strcmp(action->defname, "set") == 0)
{
VariableSetStmt *setStmt = castNode(VariableSetStmt, action->arg);
if (setStmt->kind == VAR_SET_CURRENT)
{
/* check if the set action is a SET ... FROM CURRENT */
ereport(ERROR, (errmsg("unsupported ALTER FUNCTION ... SET ... FROM "
"CURRENT for a distributed function"),
errhint("SET FROM CURRENT is not supported for "
"distributed functions, instead use the SET ... "
"TO ... syntax with a constant value.")));
}
}
}
}

View File

@ -121,6 +121,14 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
return PlanAlterTypeSchemaStmt(stmt, queryString);
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
return PlanAlterFunctionSchemaStmt(stmt, queryString);
}
default:
{
/* do nothing for unsupported objects */
@ -190,6 +198,15 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin
return;
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
ProcessAlterFunctionSchemaStmt(stmt, queryString);
return;
}
default:
{
/* do nothing for unsupported objects */

View File

@ -424,6 +424,14 @@ multi_ProcessUtility(PlannedStmt *pstmt,
break;
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
ddlJobs = PlanDropFunctionStmt(dropStatement, queryString);
}
default:
{
/* unsupported type, skipping*/
@ -474,6 +482,15 @@ multi_ProcessUtility(PlannedStmt *pstmt,
break;
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString);
break;
}
default:
{
ddlJobs = PlanRenameStmt(renameStmt, queryString);
@ -531,6 +548,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
queryString);
}
if (IsA(parsetree, AlterFunctionStmt))
{
ddlJobs = PlanAlterFunctionStmt(castNode(AlterFunctionStmt, parsetree),
queryString);
}
/*
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
@ -799,6 +822,14 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
return PlanAlterTypeOwnerStmt(stmt, queryString);
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
return PlanAlterFunctionOwnerStmt(stmt, queryString);
}
default:
{
/* do nothing for unsupported alter owner statements */

View File

@ -76,6 +76,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
missing_ok);
}
case T_AlterFunctionStmt:
{
return AlterFunctionStmtObjectAddress(castNode(AlterFunctionStmt, parseTree),
missing_ok);
}
default:
{
/*
@ -123,6 +129,14 @@ RenameStmtObjectAddress(RenameStmt *stmt, bool missing_ok)
return RenameAttributeStmtObjectAddress(stmt, missing_ok);
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
return RenameFunctionStmtObjectAddress(stmt, missing_ok);
}
default:
{
ereport(ERROR, (errmsg("unsupported rename statement to get object address "
@ -142,6 +156,14 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok)
return AlterTypeSchemaStmtObjectAddress(stmt, missing_ok);
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok);
}
default:
{
ereport(ERROR, (errmsg("unsupported alter schema statement to get object "
@ -182,6 +204,14 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
return AlterTypeOwnerObjectAddress(stmt, missing_ok);
}
#if PG_VERSION_NUM > 110000
case OBJECT_PROCEDURE:
#endif
case OBJECT_FUNCTION:
{
return AlterFunctionOwnerObjectAddress(stmt, missing_ok);
}
default:
{
ereport(ERROR, (errmsg("unsupported alter owner statement to get object "

View File

@ -50,6 +50,25 @@ extern bool TableReferencing(Oid relationId);
extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId);
/* function.c - forward declarations */
extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString);
extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt,
bool missing_ok);
extern List * PlanRenameFunctionStmt(RenameStmt *stmt, const char *queryString);
extern const ObjectAddress * RenameFunctionStmtObjectAddress(RenameStmt *stmt,
bool missing_ok);
extern List * PlanAlterFunctionOwnerStmt(AlterOwnerStmt *stmt, const char *queryString);
extern const ObjectAddress * AlterFunctionOwnerObjectAddress(AlterOwnerStmt *stmt,
bool missing_ok);
extern List * PlanAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt,
const char *queryString);
extern const ObjectAddress * AlterFunctionSchemaStmtObjectAddress(
AlterObjectSchemaStmt *stmt, bool missing_ok);
extern void ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt,
const char *queryString);
extern List * PlanDropFunctionStmt(DropStmt *stmt, const char *queryString);
/* grant.c - forward declarations */
extern List * PlanGrantStmt(GrantStmt *grantStmt);

View File

@ -10,6 +10,7 @@ SELECT run_command_on_workers($$CREATE USER functionuser;$$);
(2 rows)
CREATE SCHEMA function_tests AUTHORIZATION functionuser;
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
SET search_path TO function_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
@ -56,6 +57,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
@ -121,6 +133,170 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
localhost | 57638 | t | 5
(2 rows)
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- Test SET/RESET for alter function
ALTER FUNCTION add(int,int) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER FUNCTION add(int,int) RENAME TO add2;
SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
(2 rows)
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
ALTER FUNCTION add2(int,int) RENAME TO add;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'add';
$$);
run_command_on_workers
---------------------------------------------------------
(localhost,57637,t,"(functionuser,function_tests,add)")
(localhost,57638,t,"(functionuser,function_tests,add)")
(2 rows)
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
(2 rows)
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
(2 rows)
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int"
@ -371,6 +547,7 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
-- This is hacky, but we should clean-up the resources as below
\c - - - :worker_1_port
SET client_min_messages TO error; -- suppress cascading objects dropping
@ -385,6 +562,7 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
@ -398,6 +576,7 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
\c - - - :master_port
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);

View File

@ -0,0 +1,204 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q
\endif
SET citus.next_shard_id TO 20030000;
CREATE USER procedureuser;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT run_command_on_workers($$CREATE USER procedureuser;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
CREATE SCHEMA procedure_tests AUTHORIZATION procedureuser;
CREATE SCHEMA procedure_tests2 AUTHORIZATION procedureuser;
SET search_path TO procedure_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE OR REPLACE PROCEDURE raise_info(text)
LANGUAGE PLPGSQL AS $proc$
BEGIN
RAISE INFO 'information message %', $1;
END;
$proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('raise_info(text)', '$1');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER PROCEDURE raise_info(text) SECURITY INVOKER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- Test SET/RESET for alter procedure
ALTER PROCEDURE raise_info(text) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
ALTER PROCEDURE raise_info(text) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER PROCEDURE raise_info(text) RENAME TO raise_info2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info2(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info2('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
ALTER PROCEDURE raise_info2(text) RENAME TO raise_info;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER PROCEDURE raise_info(text) OWNER TO procedureuser;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'raise_info';
$$);
run_command_on_workers
------------------------------------------------------------------
(localhost,57637,t,"(procedureuser,procedure_tests,raise_info)")
(localhost,57638,t,"(procedureuser,procedure_tests,raise_info)")
(2 rows)
-- change the schema of the procedure and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER PROCEDURE raise_info(text) SET SCHEMA procedure_tests2;
SELECT public.verify_function_is_same_on_workers('procedure_tests2.raise_info(text)');
verify_function_is_same_on_workers
------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers($$CALL procedure_tests2.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | CALL
localhost | 57638 | t | CALL
(2 rows)
ALTER PROCEDURE procedure_tests2.raise_info(text) SET SCHEMA procedure_tests;
DROP PROCEDURE raise_info(text);
-- call should fail as procedure should have been dropped
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+----------------------------------------------------------------------
localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist
(2 rows)
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests CASCADE;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
DROP SCHEMA procedure_tests2 CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
DROP USER procedureuser;
SELECT run_command_on_workers($$DROP USER procedureuser;$$);
run_command_on_workers
---------------------------------
(localhost,57637,t,"DROP ROLE")
(localhost,57638,t,"DROP ROLE")
(2 rows)

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
276 275 f
274 273 f
transactionnumberwaitingtransactionnumbers
275
276 275
273
274 273
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
280 279 f
281 279 f
281 280 t
278 277 f
279 277 f
279 278 t
transactionnumberwaitingtransactionnumbers
279
280 279
281 279,280
277
278 277
279 277,278
step s1-abort:
ABORT;

View File

@ -1571,7 +1571,7 @@ master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s1-print-distributed-objects
starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s2-print-distributed-objects
?column?
1
@ -1659,9 +1659,7 @@ step s3-wait-for-metadata-sync:
wait_until_metadata_sync
step s1-print-distributed-objects:
SELECT 1 FROM master_add_node('localhost', 57638);
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1;
@ -1677,11 +1675,6 @@ step s1-print-distributed-objects:
SELECT count(*) FROM pg_proc WHERE proname='add';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$);
SELECT master_remove_node('localhost', 57638);
?column?
1
pg_identify_object_as_address
(function,"{public,add}","{integer,integer}")
@ -1709,9 +1702,6 @@ run_command_on_workers
master_remove_node
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s2-begin s2-commit s3-wait-for-metadata-sync s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects

View File

@ -31,6 +31,7 @@
-- DROP FUNCTION [ IF EXISTS ] name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] [, ...]
-- [ CASCADE | RESTRICT ]
SET citus.next_shard_id TO 20020000;
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA function_tests;
SET search_path TO function_tests;
SET citus.shard_count TO 4;

View File

@ -1,3 +1,10 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q
\endif
--
-- Regression tests for deparsing ALTER/DROP PROCEDURE Queries
--
@ -26,6 +33,7 @@
-- For example CALLED ON NULL INPUT action is valid only for FUNCTIONS, but we still
-- allow deparsing them here.
SET citus.next_shard_id TO 20030000;
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA procedure_tests;
SET search_path TO procedure_tests;
SET citus.shard_count TO 4;
@ -288,8 +296,9 @@ $cmd$);
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
-- rename and rename back to keep the nodes in sync
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RENAME TO summation
ALTER PROCEDURE raise_info RENAME TO summation;
$cmd$);
deparse_and_run_on_workers
---------------------------------------
@ -297,16 +306,34 @@ $cmd$);
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
CREATE ROLE PROCEDURE_role;
ALTER PROCEDURE raise_info RENAME TO summation;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE summation RENAME TO raise_info;
$cmd$);
deparse_and_run_on_workers
---------------------------------------
(localhost,57637,t,"ALTER PROCEDURE")
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
ALTER PROCEDURE summation RENAME TO raise_info;
CREATE ROLE procedure_role;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT run_command_on_workers($$CREATE ROLE procedure_role;$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role
ALTER PROCEDURE raise_info OWNER TO procedure_role
$cmd$);
deparse_and_run_on_workers
----------------------------------------------------------------------
(localhost,57637,f,"ERROR: role ""procedure_role"" does not exist")
(localhost,57638,f,"ERROR: role ""procedure_role"" does not exist")
deparse_and_run_on_workers
---------------------------------------
(localhost,57637,t,"ALTER PROCEDURE")
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
SELECT deparse_and_run_on_workers($cmd$
@ -318,26 +345,38 @@ $cmd$);
(localhost,57638,f,"ERROR: role ""missing_role"" does not exist")
(2 rows)
-- move schema and back to keep the nodes in sync
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET SCHEMA public
ALTER PROCEDURE raise_info SET SCHEMA public;
$cmd$);
deparse_and_run_on_workers
-----------------------------------------------------------------------------------------
(localhost,57637,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist")
(localhost,57638,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist")
deparse_and_run_on_workers
---------------------------------------
(localhost,57637,t,"ALTER PROCEDURE")
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
ALTER PROCEDURE raise_info SET SCHEMA public;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests;
$cmd$);
deparse_and_run_on_workers
---------------------------------------
(localhost,57637,t,"ALTER PROCEDURE")
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus
$cmd$);
deparse_and_run_on_workers
-----------------------------------------------------------------------------------------
(localhost,57637,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist")
(localhost,57638,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist")
deparse_and_run_on_workers
---------------------------------------
(localhost,57637,t,"ALTER PROCEDURE")
(localhost,57638,t,"ALTER PROCEDURE")
(2 rows)
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS raise_info(int,int);
DROP PROCEDURE raise_info(text);
$cmd$);
deparse_and_run_on_workers
--------------------------------------
@ -376,4 +415,11 @@ $cmd$);
-- clear objects
SET client_min_messages TO WARNING; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
DROP ROLE PROCEDURE_role;
DROP ROLE procedure_role;
SELECT run_command_on_workers($$DROP ROLE procedure_role;$$);
run_command_on_workers
---------------------------------
(localhost,57637,t,"DROP ROLE")
(localhost,57638,t,"DROP ROLE")
(2 rows)

View File

@ -1,448 +1,6 @@
--
-- Regression tests for deparsing ALTER/DROP PROCEDURE Queries
--
-- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ]
-- action [ ... ] [ RESTRICT ]
-- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ]
-- RENAME TO new_name
-- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ]
-- OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
-- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ]
-- SET SCHEMA new_schema
-- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ]
-- DEPENDS ON EXTENSION extension_name
-- where action is one of:
-- [ EXTERNAL ] SECURITY INVOKER | [ EXTERNAL ] SECURITY DEFINER
-- SET configuration_parameter { TO | = } { value | DEFAULT }
-- SET configuration_parameter FROM CURRENT
-- RESET configuration_parameter
-- RESET ALL
--
-- DROP PROCEDURE [ IF EXISTS ] name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] [, ...]
-- [ CASCADE | RESTRICT ]
--
-- Please note that current deparser does not return errors on some invalid queries.
--
-- For example CALLED ON NULL INPUT action is valid only for FUNCTIONS, but we still
-- allow deparsing them here.
SET citus.next_shard_id TO 20030000;
CREATE SCHEMA procedure_tests;
SET search_path TO procedure_tests;
SET citus.shard_count TO 4;
SET client_min_messages TO INFO;
-- print whether we're using version > 10 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
version_above_ten
-------------------
f
(1 row)
CREATE FUNCTION deparse_test(text)
RETURNS text
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION deparse_and_run_on_workers(text)
RETURNS SETOF record
AS $fnc$
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
$fnc$
LANGUAGE SQL;
-- Create a simple PROCEDURE and distribute it
CREATE OR REPLACE PROCEDURE raise_info(text)
LANGUAGE PLPGSQL AS $proc$
BEGIN
RAISE INFO 'information message %', $1;
END;
$proc$;
ERROR: syntax error at or near "PROCEDURE"
LINE 1: CREATE OR REPLACE PROCEDURE raise_info(text)
^
SELECT create_distributed_function('raise_info(text)');
ERROR: function "raise_info(text)" does not exist
LINE 1: SELECT create_distributed_function('raise_info(text)');
^
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info CALLED ON NULL INPUT
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RETURNS NULL ON NULL INPUT
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info STRICT
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info IMMUTABLE
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info STABLE
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info VOLATILE
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info LEAKPROOF
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info NOT LEAKPROOF
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info EXTERNAL SECURITY INVOKER
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SECURITY INVOKER
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info EXTERNAL SECURITY DEFINER
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SECURITY DEFINER
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info PARALLEL UNSAFE
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info PARALLEL RESTRICTED
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info PARALLEL SAFE
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
-- The COST/ROWS arguments should always be numeric
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info COST 1234
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info COST 1234.5
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info ROWS 10
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info ROWS 10.8
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SECURITY INVOKER SET client_min_messages TO warning;
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET log_min_messages = ERROR
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET log_min_messages TO DEFAULT
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET log_min_messages FROM CURRENT
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RESET log_min_messages
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RESET ALL
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RENAME TO summation
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
CREATE ROLE PROCEDURE_role;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info OWNER TO missing_role
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET SCHEMA public
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS raise_info(int,int);
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
-- Check that an invalid PROCEDURE name is still parsed correctly
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS missing_PROCEDURE(int, text);
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS missing_schema.missing_PROCEDURE(int,float);
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS missing_schema.missing_PROCEDURE(int,float) CASCADE;
$cmd$);
ERROR: syntax error at or near "PROCEDURE"
LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi...
^
QUERY:
WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query )
SELECT run_command_on_workers(qualified_query) FROM deparsed_query d
CONTEXT: SQL function "deparse_and_run_on_workers" statement 1
-- clear objects
SET client_min_messages TO WARNING; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
DROP ROLE PROCEDURE_role;
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q

View File

@ -54,6 +54,8 @@ WHERE pgd.refclassid = 'pg_extension'::regclass AND
-- DROP EXTENSION pre-created by the regression suite
DROP EXTENSION citus;
\c
-- these tests switch between citus versions and call ddl's that require pg_dist_object to be created
SET citus.enable_object_propagation TO 'false';
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';

View File

@ -150,3 +150,23 @@ BEGIN
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
RETURNS bool
LANGUAGE plpgsql
AS $func$
DECLARE
coordinatorSql text;
workerSql text;
BEGIN
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
IF workerSql != coordinatorSql THEN
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$func$;

View File

@ -282,6 +282,7 @@ test: ssl_by_default
# ---------
test: distributed_types distributed_types_conflict disable_object_propagation
test: distributed_functions
test: distributed_procedure
# ---------
# deparsing logic tests

View File

@ -30,20 +30,6 @@ teardown
DROP TYPE IF EXISTS tt1 CASCADE;
DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;
-- Remove the pg_dist_object record manually as we do not yet hook into DROP FUNC
-- queries. If the function does not exist, the casting to regprocedure fails.
DO
$do$
BEGIN
DELETE FROM citus.pg_dist_object WHERE objid = 'add(int,int)'::regprocedure;
EXCEPTION
WHEN undefined_function THEN RETURN;
END;
$do$;
-- similarly drop the function in the workers manually
SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$);
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
@ -230,7 +216,7 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-sche
# s3-wait-for-metadata-sync step, we do "s2-begin" followed directly by
# "s2-commit", because "COMMIT" syncs the messages
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects"
# we cannot run the following operations concurrently

View File

@ -4,6 +4,7 @@ CREATE USER functionuser;
SELECT run_command_on_workers($$CREATE USER functionuser;$$);
CREATE SCHEMA function_tests AUTHORIZATION functionuser;
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
SET search_path TO function_tests;
SET citus.shard_count TO 4;
@ -62,6 +63,14 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
@ -84,6 +93,65 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
SELECT create_distributed_function('add(int,int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- Test SET/RESET for alter function
ALTER FUNCTION add(int,int) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER FUNCTION add(int,int) RENAME TO add2;
SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
ALTER FUNCTION add2(int,int) RENAME TO add;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'add';
$$);
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
@ -215,6 +283,7 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
-- This is hacky, but we should clean-up the resources as below
@ -224,6 +293,7 @@ UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
@ -231,6 +301,7 @@ UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
\c - - - :master_port

View File

@ -0,0 +1,90 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q
\endif
SET citus.next_shard_id TO 20030000;
CREATE USER procedureuser;
SELECT run_command_on_workers($$CREATE USER procedureuser;$$);
CREATE SCHEMA procedure_tests AUTHORIZATION procedureuser;
CREATE SCHEMA procedure_tests2 AUTHORIZATION procedureuser;
SET search_path TO procedure_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE OR REPLACE PROCEDURE raise_info(text)
LANGUAGE PLPGSQL AS $proc$
BEGIN
RAISE INFO 'information message %', $1;
END;
$proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
SELECT create_distributed_function('raise_info(text)', '$1');
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER PROCEDURE raise_info(text) SECURITY INVOKER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
ALTER PROCEDURE raise_info(text) SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
-- Test SET/RESET for alter procedure
ALTER PROCEDURE raise_info(text) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
ALTER PROCEDURE raise_info(text) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
ALTER PROCEDURE raise_info(text) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
ALTER PROCEDURE raise_info(text) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER PROCEDURE raise_info(text) RENAME TO raise_info2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info2(text)');
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info2('hello');$$) ORDER BY 1,2;
ALTER PROCEDURE raise_info2(text) RENAME TO raise_info;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER PROCEDURE raise_info(text) OWNER TO procedureuser;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'raise_info';
$$);
-- change the schema of the procedure and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER PROCEDURE raise_info(text) SET SCHEMA procedure_tests2;
SELECT public.verify_function_is_same_on_workers('procedure_tests2.raise_info(text)');
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$CALL procedure_tests2.raise_info('hello');$$) ORDER BY 1,2;
ALTER PROCEDURE procedure_tests2.raise_info(text) SET SCHEMA procedure_tests;
DROP PROCEDURE raise_info(text);
-- call should fail as procedure should have been dropped
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests CASCADE;$$);
DROP SCHEMA procedure_tests2 CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$);
DROP USER procedureuser;
SELECT run_command_on_workers($$DROP USER procedureuser;$$);

View File

@ -32,6 +32,7 @@
-- [ CASCADE | RESTRICT ]
SET citus.next_shard_id TO 20020000;
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA function_tests;
SET search_path TO function_tests;

View File

@ -1,3 +1,10 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above
\gset
\if :server_verion_eleven_and_above
\else
\q
\endif
--
-- Regression tests for deparsing ALTER/DROP PROCEDURE Queries
--
@ -29,6 +36,7 @@
-- allow deparsing them here.
SET citus.next_shard_id TO 20030000;
SET citus.enable_ddl_propagation TO off;
CREATE SCHEMA procedure_tests;
SET search_path TO procedure_tests;
@ -167,30 +175,43 @@ SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RESET ALL
$cmd$);
-- rename and rename back to keep the nodes in sync
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info RENAME TO summation
ALTER PROCEDURE raise_info RENAME TO summation;
$cmd$);
ALTER PROCEDURE raise_info RENAME TO summation;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE summation RENAME TO raise_info;
$cmd$);
ALTER PROCEDURE summation RENAME TO raise_info;
CREATE ROLE PROCEDURE_role;
CREATE ROLE procedure_role;
SELECT run_command_on_workers($$CREATE ROLE procedure_role;$$);
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role
ALTER PROCEDURE raise_info OWNER TO procedure_role
$cmd$);
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info OWNER TO missing_role
$cmd$);
-- move schema and back to keep the nodes in sync
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info SET SCHEMA public
ALTER PROCEDURE raise_info SET SCHEMA public;
$cmd$);
ALTER PROCEDURE raise_info SET SCHEMA public;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests;
$cmd$);
ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests;
SELECT deparse_and_run_on_workers($cmd$
ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus
$cmd$);
SELECT deparse_and_run_on_workers($cmd$
DROP PROCEDURE IF EXISTS raise_info(int,int);
DROP PROCEDURE raise_info(text);
$cmd$);
-- Check that an invalid PROCEDURE name is still parsed correctly
@ -209,4 +230,5 @@ $cmd$);
-- clear objects
SET client_min_messages TO WARNING; -- suppress cascading objects dropping
DROP SCHEMA procedure_tests CASCADE;
DROP ROLE PROCEDURE_role;
DROP ROLE procedure_role;
SELECT run_command_on_workers($$DROP ROLE procedure_role;$$);

View File

@ -53,6 +53,9 @@ WHERE pgd.refclassid = 'pg_extension'::regclass AND
DROP EXTENSION citus;
\c
-- these tests switch between citus versions and call ddl's that require pg_dist_object to be created
SET citus.enable_object_propagation TO 'false';
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '7.0-1';

View File

@ -152,4 +152,23 @@ BEGIN
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
RETURNS bool
LANGUAGE plpgsql
AS $func$
DECLARE
coordinatorSql text;
workerSql text;
BEGIN
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
IF workerSql != coordinatorSql THEN
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$func$;