diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index c952babd4..89440ed22 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -182,6 +182,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) return CreateTypeDDLCommandsIdempotent(dependency); } + case OCLASS_EXTENSION: + { + return CreateExtensionDDLCommand(dependency); + } + default: { break; diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 80f779bac..60a423f5f 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -11,35 +11,32 @@ #include "postgres.h" #include "citus_version.h" +#include "catalog/pg_extension_d.h" +#include "commands/extension.h" +#include "distributed/citus_ruleutils.h" #include "distributed/commands.h" -#include "distributed/metadata_cache.h" -#include "nodes/parsenodes.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" +#include "distributed/transaction_management.h" +#include "nodes/makefuncs.h" +#include "utils/lsyscache.h" +#include "utils/builtins.h" + /* Local functions forward declarations for helper functions */ -static char * ExtractNewExtensionVersion(Node *parsetree); - - -/* - * IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER - * EXTENSION statement which references the citus extension. This function - * returns false for all other inputs. - */ -bool -IsCitusExtensionStmt(Node *parsetree) -{ - char *extensionName = ""; - - if (IsA(parsetree, CreateExtensionStmt)) - { - extensionName = ((CreateExtensionStmt *) parsetree)->extname; - } - else if (IsA(parsetree, AlterExtensionStmt)) - { - extensionName = ((AlterExtensionStmt *) parsetree)->extname; - } - - return (strcmp(extensionName, "citus") == 0); -} +static char * ExtractNewExtensionVersion(Node *parseTree); +static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); +static List * FilterDistributedExtensions(List *extensionObjectList); +static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); +static void EnsureSequentialModeForExtensionDDL(void); +static bool ShouldPropagateExtensionCommand(Node *parseTree); +static bool IsDropCitusStmt(Node *parseTree); +static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); +static Node * RecreateExtensionStmt(Oid extensionOid); /* @@ -49,9 +46,9 @@ IsCitusExtensionStmt(Node *parsetree) * out. It ignores the schema version. */ void -ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree) +ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parseTree) { - char *newExtensionVersion = ExtractNewExtensionVersion(parsetree); + char *newExtensionVersion = ExtractNewExtensionVersion(parseTree); if (newExtensionVersion != NULL) { @@ -78,24 +75,24 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree) /* - * ExtractNewExtensionVersion returns the new extension version specified by - * a CREATE or ALTER EXTENSION statement. Other inputs are not permitted. This - * function returns NULL for statements with no explicit version specified. + * ExtractNewExtensionVersion returns the palloc'd new extension version specified + * by a CREATE or ALTER EXTENSION statement. Other inputs are not permitted. + * This function returns NULL for statements with no explicit version specified. */ static char * -ExtractNewExtensionVersion(Node *parsetree) +ExtractNewExtensionVersion(Node *parseTree) { - char *newVersion = NULL; - List *optionsList = NIL; - ListCell *optionsCell = NULL; + Value *newVersionValue = NULL; - if (IsA(parsetree, CreateExtensionStmt)) + List *optionsList = NIL; + + if (IsA(parseTree, CreateExtensionStmt)) { - optionsList = ((CreateExtensionStmt *) parsetree)->options; + optionsList = ((CreateExtensionStmt *) parseTree)->options; } - else if (IsA(parsetree, AlterExtensionStmt)) + else if (IsA(parseTree, AlterExtensionStmt)) { - optionsList = ((AlterExtensionStmt *) parsetree)->options; + optionsList = ((AlterExtensionStmt *) parseTree)->options; } else { @@ -103,15 +100,739 @@ ExtractNewExtensionVersion(Node *parsetree) Assert(false); } - foreach(optionsCell, optionsList) + newVersionValue = GetExtensionOption(optionsList, "new_version"); + + /* return target string safely */ + if (newVersionValue) { - DefElem *defElement = (DefElem *) lfirst(optionsCell); - if (strncmp(defElement->defname, "new_version", NAMEDATALEN) == 0) + const char *newVersion = strVal(newVersionValue); + + return pstrdup(newVersion); + } + else + { + return NULL; + } +} + + +/* + * PlanCreateExtensionStmt is called during the creation of an extension. + * It is executed before the statement is applied locally. + * We decide if the extension needs to be replicated to the worker, and + * if that is the case return a list of DDLJob's that describe how and + * where the extension needs to be created. + */ +List * +PlanCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt, const char *queryString) +{ + List *commands = NIL; + const char *createExtensionStmtSql = NULL; + + if (!ShouldPropagateExtensionCommand((Node *) createExtensionStmt)) + { + return NIL; + } + + /* + * If the extension command is a part of a bigger multi-statement transaction, + * do not propagate it + */ + if (IsMultiStatementTransaction()) + { + return NIL; + } + + /* extension management can only be done via coordinator node */ + EnsureCoordinator(); + + /* + * Make sure that no new nodes are added after this point until the end of the + * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the + * ExclusiveLock taken by master_add_node. + * This guarantees that all active nodes will have the extension, because they will + * either get it now, or get it in master_add_node after this transaction finishes and + * the pg_dist_object record becomes visible. + */ + LockRelationOid(DistNodeRelationId(), RowShareLock); + + /* + * Make sure that the current transaction is already in sequential mode, + * or can still safely be put in sequential mode + */ + EnsureSequentialModeForExtensionDDL(); + + /* + * Here we append "schema" field to the "options" list (if not specified) + * to satisfy the schema consistency between worker nodes and the coordinator. + */ + AddSchemaFieldIfMissing(createExtensionStmt); + + createExtensionStmtSql = DeparseTreeNode((Node *) createExtensionStmt); + + /* + * To prevent recursive propagation in mx architecture, we disable ddl + * propagation before sending the command to workers. + */ + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) createExtensionStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * AddSchemaFieldIfMissing adds DefElem item for "schema" (if not specified + * in statement) to "options" list before deparsing the statement to satisfy + * the schema consistency between worker nodes and the coordinator. + */ +static void +AddSchemaFieldIfMissing(CreateExtensionStmt *createExtensionStmt) +{ + List *optionsList = createExtensionStmt->options; + + Value *schemaNameValue = GetExtensionOption(optionsList, "schema"); + + if (!schemaNameValue) + { + /* + * As we already created the extension by standard_ProcessUtility, + * we actually know the schema it belongs to + */ + bool missingOk = false; + Oid extensionOid = get_extension_oid(createExtensionStmt->extname, missingOk); + Oid extensionSchemaOid = get_extension_schema(extensionOid); + char *extensionSchemaName = get_namespace_name(extensionSchemaOid); + + Node *schemaNameArg = (Node *) makeString(extensionSchemaName); + + /* set location to -1 as it is unknown */ + int location = -1; + + DefElem *newDefElement = makeDefElem("schema", schemaNameArg, location); + + createExtensionStmt->options = lappend(createExtensionStmt->options, + newDefElement); + } +} + + +/* + * ProcessCreateExtensionStmt is executed after the extension has been + * created locally and before we create it on the worker nodes. + * As we now have access to ObjectAddress of the extension that is just + * created, we can mark it as distributed to make sure that its + * dependencies exist on all nodes. + */ +void +ProcessCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt, const + char *queryString) +{ + const ObjectAddress *extensionAddress = NULL; + + if (!ShouldPropagateExtensionCommand((Node *) createExtensionStmt)) + { + return; + } + + + /* + * If the extension command is a part of a bigger multi-statement transaction, + * do not propagate it + */ + if (IsMultiStatementTransaction()) + { + return; + } + + extensionAddress = GetObjectAddressFromParseTree((Node *) createExtensionStmt, false); + + EnsureDependenciesExistsOnAllNodes(extensionAddress); + + MarkObjectDistributed(extensionAddress); +} + + +/* + * PlanDropExtensionStmt is called to drop extension(s) in coordinator and + * in worker nodes if distributed before. + * We first ensure that we keep only the distributed ones before propagating + * the statement to worker nodes. + * If no extensions in the drop list are distributed, then no calls will + * be made to the workers. + */ +List * +PlanDropExtensionStmt(DropStmt *dropStmt, const char *queryString) +{ + List *allDroppedExtensions = dropStmt->objects; + + List *distributedExtensions = NIL; + List *distributedExtensionAddresses = NIL; + + List *commands = NIL; + const char *deparsedStmt = NULL; + + ListCell *addressCell = NULL; + + if (!ShouldPropagateExtensionCommand((Node *) dropStmt)) + { + return NIL; + } + + /* get distributed extensions to be dropped in worker nodes as well */ + distributedExtensions = FilterDistributedExtensions(allDroppedExtensions); + + if (list_length(distributedExtensions) <= 0) + { + /* no distributed extensions to drop */ + return NIL; + } + + /* extension management can only be done via coordinator node */ + EnsureCoordinator(); + + /* + * Make sure that no new nodes are added after this point until the end of the + * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the + * ExclusiveLock taken by master_add_node. + * This guarantees that all active nodes will drop the extension, because they will + * either get it now, or get it in master_add_node after this transaction finishes and + * the pg_dist_object record becomes visible. + */ + LockRelationOid(DistNodeRelationId(), RowShareLock); + + /* + * Make sure that the current transaction is already in sequential mode, + * or can still safely be put in sequential mode + */ + EnsureSequentialModeForExtensionDDL(); + + distributedExtensionAddresses = ExtensionNameListToObjectAddressList( + distributedExtensions); + + /* unmark each distributed extension */ + foreach(addressCell, distributedExtensionAddresses) + { + ObjectAddress *address = (ObjectAddress *) lfirst(addressCell); + UnmarkObjectDistributed(address); + } + + /* + * Temporary swap the lists of objects to delete with the distributed + * objects and deparse to an sql statement for the workers. + * Then switch back to allDroppedExtensions to drop all specified + * extensions in coordinator after PlanDropExtensionStmt completes + * its execution. + */ + dropStmt->objects = distributedExtensions; + deparsedStmt = DeparseTreeNode((Node *) dropStmt); + + dropStmt->objects = allDroppedExtensions; + + /* + * To prevent recursive propagation in mx architecture, we disable ddl + * propagation before sending the command to workers. + */ + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) deparsedStmt, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * FilterDistributedExtensions returns the distributed objects in an "objects" + * list of a DropStmt, a list having the format of a "DropStmt.objects" list. + * That is, in turn, a list of string "Value"s. + */ +static List * +FilterDistributedExtensions(List *extensionObjectList) +{ + List *extensionNameList = NIL; + + bool missingOk = true; + ListCell *objectCell = NULL; + + foreach(objectCell, extensionObjectList) + { + char *extensionName = strVal(lfirst(objectCell)); + + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + + Oid extensionOid = get_extension_oid(extensionName, missingOk); + + if (!OidIsValid(extensionOid)) { - newVersion = strVal(defElement->arg); - break; + continue; + } + + ObjectAddressSet(*address, ExtensionRelationId, extensionOid); + + if (!IsObjectDistributed(address)) + { + continue; + } + + extensionNameList = lappend(extensionNameList, makeString(extensionName)); + } + + return extensionNameList; +} + + +/* + * ExtensionNameListToObjectAddressList returns the object addresses in + * an ObjectAddress list for an "objects" list of a DropStmt. + * Callers of this function should ensure that all the objects in the list + * are valid and distributed. + */ +static List * +ExtensionNameListToObjectAddressList(List *extensionObjectList) +{ + List *extensionObjectAddressList = NIL; + + ListCell *objectCell = NULL; + + foreach(objectCell, extensionObjectList) + { + /* + * We set missingOk to false as we assume all the objects in + * extensionObjectList list are valid and distributed. + */ + bool missingOk = false; + + const char *extensionName = strVal(lfirst(objectCell)); + + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + + Oid extensionOid = get_extension_oid(extensionName, missingOk); + + ObjectAddressSet(*address, ExtensionRelationId, extensionOid); + + extensionObjectAddressList = lappend(extensionObjectAddressList, address); + } + + return extensionObjectAddressList; +} + + +/* + * PlanAlterExtensionSchemaStmt is invoked for alter extension set schema statements. + */ +List * +PlanAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, const + char *queryString) +{ + const char *alterExtensionStmtSql = NULL; + List *commands = NIL; + + if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt)) + { + return NIL; + } + + /* extension management can only be done via coordinator node */ + EnsureCoordinator(); + + /* + * Make sure that no new nodes are added after this point until the end of the + * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the + * ExclusiveLock taken by master_add_node. + * This guarantees that all active nodes will update the extension schema after + * this transaction finishes and the pg_dist_object record becomes visible. + */ + LockRelationOid(DistNodeRelationId(), RowShareLock); + + /* + * Make sure that the current transaction is already in sequential mode, + * or can still safely be put in sequential mode + */ + EnsureSequentialModeForExtensionDDL(); + + alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); + + /* + * To prevent recursive propagation in mx architecture, we disable ddl + * propagation before sending the command to workers. + */ + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) alterExtensionStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * ProcessAlterExtensionSchemaStmt is executed after the change has been applied + * locally, we can now use the new dependencies (schema) of the extension to ensure + * all its dependencies exist on the workers before we apply the commands remotely. + */ +void +ProcessAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, const + char *queryString) +{ + const ObjectAddress *extensionAddress = NULL; + + extensionAddress = GetObjectAddressFromParseTree((Node *) alterExtensionStmt, false); + + if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt)) + { + return; + } + + /* dependencies (schema) have changed let's ensure they exist */ + EnsureDependenciesExistsOnAllNodes(extensionAddress); +} + + +/* + * PlanAlterExtensionUpdateStmt is invoked for alter extension update statements. + */ +List * +PlanAlterExtensionUpdateStmt(AlterExtensionStmt *alterExtensionStmt, const + char *queryString) +{ + const char *alterExtensionStmtSql = NULL; + List *commands = NIL; + + if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt)) + { + return NIL; + } + + /* extension management can only be done via coordinator node */ + EnsureCoordinator(); + + /* + * Make sure that no new nodes are added after this point until the end of the + * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the + * ExclusiveLock taken by master_add_node. + * This guarantees that all active nodes will update the extension version, because + * they will either get it now, or get it in master_add_node after this transaction + * finishes and the pg_dist_object record becomes visible. + */ + LockRelationOid(DistNodeRelationId(), RowShareLock); + + /* + * Make sure that the current transaction is already in sequential mode, + * or can still safely be put in sequential mode + */ + EnsureSequentialModeForExtensionDDL(); + + alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); + + /* + * To prevent recursive propagation in mx architecture, we disable ddl + * propagation before sending the command to workers. + */ + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) alterExtensionStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the beginnig. + * + * As extensions are node scoped objects there exists only 1 instance of the + * extension used by potentially multiple shards. To make sure all shards in + * the transaction can interact with the extension the extension needs to be + * visible on all connections used by the transaction, meaning we can only use + * 1 connection per node. + */ +static void +EnsureSequentialModeForExtensionDDL(void) +{ + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot run extension command because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail( + "When running command on/for a distributed extension, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail( + "A command for a distributed extension is run. To make sure subsequent " + "commands see the type correctly we need to make sure to " + "use only one connection for all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} + + +/* + * ShouldPropagateExtensionCommand determines whether to propagate an extension + * command to the worker nodes. + */ +static bool +ShouldPropagateExtensionCommand(Node *parseTree) +{ + /* if we disabled object propagation, then we should not propagate anything. */ + if (!EnableDependencyCreation) + { + return false; + } + + /* + * If extension command is run for/on citus, leave the rest to standard utility hook + * by returning false. + */ + if (IsCreateAlterExtensionUpdateCitusStmt(parseTree)) + { + return false; + } + else if (IsDropCitusStmt(parseTree)) + { + return false; + } + else if (IsAlterExtensionSetSchemaCitus(parseTree)) + { + return false; + } + + return true; +} + + +/* + * IsCreateAlterExtensionUpdateCitusStmt returns whether a given utility is a + * CREATE or ALTER EXTENSION UPDATE statement which references the citus extension. + * This function returns false for all other inputs. + */ +bool +IsCreateAlterExtensionUpdateCitusStmt(Node *parseTree) +{ + const char *extensionName = NULL; + + if (IsA(parseTree, CreateExtensionStmt)) + { + extensionName = ((CreateExtensionStmt *) parseTree)->extname; + } + else if (IsA(parseTree, AlterExtensionStmt)) + { + extensionName = ((AlterExtensionStmt *) parseTree)->extname; + } + else + { + /* + * If it is not a Create Extension or a Alter Extension stmt, + * it does not matter if the it is about citus + */ + return false; + } + + /* + * Now that we have CreateExtensionStmt or AlterExtensionStmt, + * check if it is run for/on citus + */ + return (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0); +} + + +/* + * IsDropCitusStmt iterates the objects to be dropped in a drop statement + * and try to find citus there. + */ +static bool +IsDropCitusStmt(Node *parseTree) +{ + ListCell *objectCell = NULL; + + /* if it is not a DropStmt, it is needless to search for citus */ + if (!IsA(parseTree, DropStmt)) + { + return false; + } + + /* now that we have a DropStmt, check if citus is among the objects to dropped */ + foreach(objectCell, ((DropStmt *) parseTree)->objects) + { + const char *extensionName = strVal(lfirst(objectCell)); + + if (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0) + { + return true; } } - return newVersion; + return false; +} + + +/* + * IsAlterExtensionSetSchemaCitus returns whether a given utility is an + * ALTER EXTENSION SET SCHEMA statement which references the citus extension. + * This function returns false for all other inputs. + */ +static bool +IsAlterExtensionSetSchemaCitus(Node *parseTree) +{ + const char *extensionName = NULL; + + if (IsA(parseTree, AlterObjectSchemaStmt)) + { + AlterObjectSchemaStmt *alterExtensionSetSchemaStmt = + (AlterObjectSchemaStmt *) parseTree; + + if (alterExtensionSetSchemaStmt->objectType == OBJECT_EXTENSION) + { + extensionName = strVal(((AlterObjectSchemaStmt *) parseTree)->object); + + /* + * Now that we have AlterObjectSchemaStmt for an extension, + * check if it is run for/on citus + */ + return (strncasecmp(extensionName, CITUS_NAME, NAMEDATALEN) == 0); + } + } + + return false; +} + + +/* + * CreateExtensionDDLCommand returns a list of DDL statements (const char *) to be + * executed on a node to recreate the extension addressed by the extensionAddress. + */ +List * +CreateExtensionDDLCommand(const ObjectAddress *extensionAddress) +{ + List *ddlCommands = NIL; + const char *ddlCommand = NULL; + + Node *stmt = NULL; + + /* generate a statement for creation of the extension in "if not exists" construct */ + stmt = RecreateExtensionStmt(extensionAddress->objectId); + + /* capture ddl command for the create statement */ + ddlCommand = DeparseTreeNode(stmt); + + ddlCommands = list_make1((void *) ddlCommand); + + return ddlCommands; +} + + +/* + * RecreateEnumStmt returns a parsetree for a CREATE EXTENSION statement that would + * recreate the given extension on a new node. + */ +static Node * +RecreateExtensionStmt(Oid extensionOid) +{ + CreateExtensionStmt *createExtensionStmt = makeNode(CreateExtensionStmt); + + char *extensionName = get_extension_name(extensionOid); + + if (!extensionName) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("extension with oid %u does not exist", + extensionOid))); + } + + /* schema DefElement related variables */ + Oid extensionSchemaOid = InvalidOid; + char *extensionSchemaName = NULL; + Node *schemaNameArg = NULL; + + /* set location to -1 as it is unknown */ + int location = -1; + DefElem *schemaDefElement = NULL; + + /* set extension name and if_not_exists fields */ + createExtensionStmt->extname = extensionName; + createExtensionStmt->if_not_exists = true; + + /* get schema name that extension was created on */ + extensionSchemaOid = get_extension_schema(extensionOid); + extensionSchemaName = get_namespace_name(extensionSchemaOid); + + /* make DefEleme for extensionSchemaName */ + schemaNameArg = (Node *) makeString(extensionSchemaName); + + schemaDefElement = makeDefElem("schema", schemaNameArg, location); + + /* append the schema name DefElem finally */ + createExtensionStmt->options = lappend(createExtensionStmt->options, + schemaDefElement); + + return (Node *) createExtensionStmt; +} + + +/* + * AlterExtensionSchemaStmtObjectAddress returns the ObjectAddress of the extension that is + * the subject of the AlterObjectSchemaStmt. Errors if missing_ok is false. + */ +const ObjectAddress * +AlterExtensionSchemaStmtObjectAddress(AlterObjectSchemaStmt *alterExtensionSchemaStmt, + bool missing_ok) +{ + ObjectAddress *extensionAddress = NULL; + Oid extensionOid = InvalidOid; + const char *extensionName = NULL; + + Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION); + + extensionName = strVal(alterExtensionSchemaStmt->object); + + extensionOid = get_extension_oid(extensionName, missing_ok); + + if (extensionOid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("extension \"%s\" does not exist", + extensionName))); + } + + extensionAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*extensionAddress, ExtensionRelationId, extensionOid); + + return extensionAddress; +} + + +/* + * AlterExtensionUpdateStmtObjectAddress returns the ObjectAddress of the extension that is + * the subject of the AlterExtensionStmt. Errors if missing_ok is false. + */ +const ObjectAddress * +AlterExtensionUpdateStmtObjectAddress(AlterExtensionStmt *alterExtensionStmt, + bool missing_ok) +{ + ObjectAddress *extensionAddress = NULL; + Oid extensionOid = InvalidOid; + const char *extensionName = NULL; + + extensionName = alterExtensionStmt->extname; + + extensionOid = get_extension_oid(extensionName, missing_ok); + + if (extensionOid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("extension \"%s\" does not exist", + extensionName))); + } + + extensionAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*extensionAddress, ExtensionRelationId, extensionOid); + + return extensionAddress; } diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index d6e19335b..8bb29eb39 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -128,6 +128,11 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) return PlanAlterFunctionSchemaStmt(stmt, queryString); } + case OBJECT_EXTENSION: + { + return PlanAlterExtensionSchemaStmt(stmt, queryString); + } + default: { /* do nothing for unsupported objects */ @@ -205,6 +210,12 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin return; } + case OBJECT_EXTENSION: + { + ProcessAlterExtensionSchemaStmt(stmt, queryString); + return; + } + default: { /* do nothing for unsupported objects */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index afab20f82..34744a31f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -117,7 +117,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, { Node *parsetree = pstmt->utilityStmt; List *ddlJobs = NIL; - bool checkExtensionVersion = false; + bool checkCreateAlterExtensionVersion = false; if (IsA(parsetree, TransactionStmt) || IsA(parsetree, LockStmt) || @@ -143,13 +143,12 @@ multi_ProcessUtility(PlannedStmt *pstmt, return; } - checkExtensionVersion = IsCitusExtensionStmt(parsetree); - if (EnableVersionChecks && checkExtensionVersion) + checkCreateAlterExtensionVersion = IsCreateAlterExtensionUpdateCitusStmt(parsetree); + if (EnableVersionChecks && checkCreateAlterExtensionVersion) { ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); } - if (!CitusHasBeenLoaded()) { /* @@ -430,6 +429,13 @@ multi_ProcessUtility(PlannedStmt *pstmt, case OBJECT_FUNCTION: { ddlJobs = PlanDropFunctionStmt(dropStatement, queryString); + break; + } + + case OBJECT_EXTENSION: + { + ddlJobs = PlanDropExtensionStmt(dropStatement, queryString); + break; } default: @@ -565,6 +571,20 @@ multi_ProcessUtility(PlannedStmt *pstmt, castNode(AlterObjectDependsStmt, parsetree), queryString); } + if (IsA(parsetree, AlterExtensionStmt)) + { + ddlJobs = PlanAlterExtensionUpdateStmt(castNode(AlterExtensionStmt, + parsetree), queryString); + } + + if (IsA(parsetree, AlterExtensionContentsStmt)) + { + ereport(NOTICE, (errmsg( + "Citus does not propagate adding/dropping member objects"), + errhint( + "You can add/drop the member objects on the workers as well."))); + } + /* * ALTER TABLE ALL IN TABLESPACE statements have their node type as * AlterTableMoveAllStmt. At the moment we do not support this functionality in @@ -795,6 +815,20 @@ multi_ProcessUtility(PlannedStmt *pstmt, ProcessAlterTableStmtAttachPartition(alterTableStatement); } + /* + * We call PlanCreateExtensionStmt and ProcessCreateExtensionStmt after standard_ProcessUtility + * does its work to learn the schema that the extension belongs to (if statement does not include + * WITH SCHEMA clause) + */ + if (EnableDDLPropagation && IsA(parsetree, CreateExtensionStmt)) + { + CreateExtensionStmt *createExtensionStmt = castNode(CreateExtensionStmt, + parsetree); + + ddlJobs = PlanCreateExtensionStmt(createExtensionStmt, queryString); + ProcessCreateExtensionStmt(createExtensionStmt, queryString); + } + /* don't run post-process code for local commands */ if (ddlJobs != NIL) { diff --git a/src/backend/distributed/deparser/deparse.c b/src/backend/distributed/deparser/deparse.c index dcf2807ce..092b29d5b 100644 --- a/src/backend/distributed/deparser/deparse.c +++ b/src/backend/distributed/deparser/deparse.c @@ -37,6 +37,10 @@ static const char * DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt); * * - ALTER FUNCTION, ALTER PROCEDURE, ALTER AGGREGATE * - DROP FUNCTION, DROP PROCEDURE, DROP AGGREGATE + * + * - CREATE EXTENSION + * - ALTER EXTENSION + * - DROP EXTENSION */ const char * DeparseTreeNode(Node *stmt) @@ -98,6 +102,16 @@ DeparseTreeNode(Node *stmt) return DeparseAlterRoleStmt(castNode(AlterRoleStmt, stmt)); } + case T_CreateExtensionStmt: + { + return DeparseCreateExtensionStmt(castNode(CreateExtensionStmt, stmt)); + } + + case T_AlterExtensionStmt: + { + return DeparseAlterExtensionStmt(castNode(AlterExtensionStmt, stmt)); + } + default: { ereport(ERROR, (errmsg("unsupported statement for deparsing"))); @@ -129,6 +143,11 @@ DeparseDropStmt(DropStmt *stmt) return DeparseDropFunctionStmt(stmt); } + case OBJECT_EXTENSION: + { + return DeparseDropExtensionStmt(stmt); + } + default: { ereport(ERROR, (errmsg("unsupported drop statement for deparsing"))); @@ -250,6 +269,11 @@ DeparseAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt) return DeparseAlterFunctionSchemaStmt(stmt); } + case OBJECT_EXTENSION: + { + return DeparseAlterExtensionSchemaStmt(stmt); + } + default: { ereport(ERROR, (errmsg("unsupported rename statement for deparsing"))); diff --git a/src/backend/distributed/deparser/deparse_extension_stmts.c b/src/backend/distributed/deparser/deparse_extension_stmts.c new file mode 100644 index 000000000..9c5a14ac2 --- /dev/null +++ b/src/backend/distributed/deparser/deparse_extension_stmts.c @@ -0,0 +1,280 @@ +/*------------------------------------------------------------------------- + * + * deparse_extension_stmts.c + * All routines to deparse extension statements. + * This file contains deparse functions for extension statement deparsing + * as well as related helper functions. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/namespace.h" +#include "distributed/deparser.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" +#include "nodes/parsenodes.h" +#include "utils/builtins.h" + +/* Local functions forward declarations for helper functions */ +static void AppendCreateExtensionStmt(StringInfo buf, CreateExtensionStmt *stmt); +static void AppendDropExtensionStmt(StringInfo buf, DropStmt *stmt); +static void AppendExtensionNameList(StringInfo buf, List *objects); +static void AppendAlterExtensionSchemaStmt(StringInfo buf, + AlterObjectSchemaStmt *alterExtensionSchemaStmt); +static void AppendAlterExtensionStmt(StringInfo buf, + AlterExtensionStmt *alterExtensionStmt); + + +/* + * GetExtensionOption returns Value* of DefElem node with "defname" from "options" list + */ +Value * +GetExtensionOption(List *extensionOptions, const char *defname) +{ + Value *targetValue = NULL; + + ListCell *defElemCell = NULL; + + foreach(defElemCell, extensionOptions) + { + DefElem *defElement = (DefElem *) lfirst(defElemCell); + + if (IsA(defElement, DefElem) && strncmp(defElement->defname, defname, + NAMEDATALEN) == 0) + { + targetValue = (Value *) defElement->arg; + break; + } + } + + /* return target string safely */ + if (targetValue) + { + return targetValue; + } + else + { + return NULL; + } +} + + +/* + * DeparseCreateExtensionStmt builds and returns a string representing the + * CreateExtensionStmt to be sent to worker nodes. + */ +const char * +DeparseCreateExtensionStmt(CreateExtensionStmt *createExtensionStmt) +{ + StringInfoData sql = { 0 }; + initStringInfo(&sql); + + AppendCreateExtensionStmt(&sql, createExtensionStmt); + + return sql.data; +} + + +/* + * AppendCreateExtensionStmt appends a string representing the CreateExtensionStmt to a buffer + */ +static void +AppendCreateExtensionStmt(StringInfo buf, CreateExtensionStmt *createExtensionStmt) +{ + const char *extensionName = createExtensionStmt->extname; + + List *optionsList = createExtensionStmt->options; + + /* + * We fetch "new_version", "schema" and "cascade" options from + * optionList as we will append "IF NOT EXISTS" clause regardless of + * statement's content before propagating it to worker nodes. + * We also do not care old_version for now. + */ + Value *schemaNameValue = GetExtensionOption(optionsList, "schema"); + + /* these can be NULL hence check before fetching the stored value */ + Value *newVersionValue = GetExtensionOption(optionsList, "new_version"); + Value *cascadeValue = GetExtensionOption(optionsList, "cascade"); + + /* + * We do not check for if schemaName is NULL as we append it in deparse + * logic if it is not specified. + */ + const char *schemaName = strVal(schemaNameValue); + schemaName = quote_identifier(schemaName); + + appendStringInfo(buf, "CREATE EXTENSION IF NOT EXISTS %s WITH SCHEMA %s", + extensionName, schemaName); + + /* "new_version" may not be specified in CreateExtensionStmt */ + if (newVersionValue) + { + const char *newVersion = strVal(newVersionValue); + newVersion = quote_identifier(newVersion); + + appendStringInfo(buf, " VERSION %s", newVersion); + } + + /* "cascade" may not be specified in CreateExtensionStmt */ + if (cascadeValue) + { + bool cascade = intVal(cascadeValue); + + if (cascade) + { + appendStringInfoString(buf, " CASCADE"); + } + } + + appendStringInfoString(buf, " ;"); +} + + +/* + * DeparseAlterExtensionStmt builds and returns a string representing the + * AlterExtensionStmt to be sent to worker nodes. + */ +const char * +DeparseAlterExtensionStmt(AlterExtensionStmt *alterExtensionStmt) +{ + StringInfoData sql = { 0 }; + initStringInfo(&sql); + + AppendAlterExtensionStmt(&sql, alterExtensionStmt); + + return sql.data; +} + + +/* + * AppendAlterExtensionStmt appends a string representing the AlterExtensionStmt to a buffer + */ +static void +AppendAlterExtensionStmt(StringInfo buf, AlterExtensionStmt *alterExtensionStmt) +{ + const char *extensionName = alterExtensionStmt->extname; + + List *optionsList = alterExtensionStmt->options; + + Value *newVersionValue = GetExtensionOption(optionsList, "new_version"); + + appendStringInfo(buf, "ALTER EXTENSION %s UPDATE ", extensionName); + + /* "new_version" may not be specified in AlterExtensionStmt */ + if (newVersionValue) + { + const char *newVersion = strVal(newVersionValue); + newVersion = quote_identifier(newVersion); + + appendStringInfo(buf, " TO %s", newVersion); + } + + appendStringInfoString(buf, ";"); +} + + +/* + * DeparseDropExtensionStmt builds and returns a string representing the DropStmt + */ +const char * +DeparseDropExtensionStmt(DropStmt *dropStmt) +{ + StringInfoData str = { 0 }; + initStringInfo(&str); + + AppendDropExtensionStmt(&str, dropStmt); + + return str.data; +} + + +/* + * AppendDropExtensionStmt appends a string representing the DropStmt for + * an extension to a buffer. + */ +static void +AppendDropExtensionStmt(StringInfo str, DropStmt *dropStmt) +{ + /* we append "IF NOT EXISTS" clause regardless of the content of the statement. */ + appendStringInfoString(str, "DROP EXTENSION IF EXISTS "); + + /* + * Pick the distributed ones from the "objects" list that is storing + * the object names to be deleted. + */ + AppendExtensionNameList(str, dropStmt->objects); + + /* depending on behaviour field of DropStmt, we should append CASCADE or RESTRICT */ + if (dropStmt->behavior == DROP_CASCADE) + { + appendStringInfoString(str, " CASCADE;"); + } + else + { + appendStringInfoString(str, " RESTRICT;"); + } +} + + +/* + * AppendExtensionNameList appends a string representing the list of + * extension names to a buffer. + */ +static void +AppendExtensionNameList(StringInfo str, List *objects) +{ + ListCell *objectCell = NULL; + + foreach(objectCell, objects) + { + const char *extensionName = strVal(lfirst(objectCell)); + + if (objectCell != list_head(objects)) + { + appendStringInfo(str, ", "); + } + + appendStringInfoString(str, extensionName); + } +} + + +/* + * DeparseAlterExtensionSchemaStmt builds and returns a string representing the + * AlterObjectSchemaStmt (ALTER EXTENSION SET SCHEMA). + */ +const char * +DeparseAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionSchemaStmt) +{ + StringInfoData str = { 0 }; + initStringInfo(&str); + + Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION); + + AppendAlterExtensionSchemaStmt(&str, alterExtensionSchemaStmt); + + return str.data; +} + + +/* + * AppendAlterExtensionSchemaStmt appends a string representing the AlterObjectSchemaStmt + * for an extension to a buffer. + */ +static void +AppendAlterExtensionSchemaStmt(StringInfo buf, + AlterObjectSchemaStmt *alterExtensionSchemaStmt) +{ + const char *extensionName = NULL; + + Assert(alterExtensionSchemaStmt->objectType == OBJECT_EXTENSION); + + extensionName = strVal(alterExtensionSchemaStmt->object); + appendStringInfo(buf, "ALTER EXTENSION %s SET SCHEMA %s;", extensionName, + quote_identifier(alterExtensionSchemaStmt->newschema)); +} diff --git a/src/backend/distributed/deparser/objectaddress.c b/src/backend/distributed/deparser/objectaddress.c index 98a7b8aeb..5624bd503 100644 --- a/src/backend/distributed/deparser/objectaddress.c +++ b/src/backend/distributed/deparser/objectaddress.c @@ -12,8 +12,11 @@ #include "postgres.h" +#include "commands/extension.h" #include "distributed/commands.h" #include "distributed/deparser.h" +#include "catalog/objectaddress.h" +#include "catalog/pg_extension_d.h" static const ObjectAddress * AlterTableStmtObjectAddress(AlterTableStmt *stmt, bool missing_ok); @@ -26,7 +29,11 @@ static const ObjectAddress * AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok); static const ObjectAddress * AlterObjectDependsStmtObjectAddress( AlterObjectDependsStmt *stmt, bool missing_ok); - +static const ObjectAddress * CreateExtensionStmtObjectAddress(CreateExtensionStmt *stmt, + bool missing_ok); +static const ObjectAddress * AlterExtensionStmtObjectAddress( + AlterExtensionStmt *alterExtensionStmt, bool + missing_ok); /* * GetObjectAddressFromParseTree returns the ObjectAdderss of the main target of the parse @@ -109,6 +116,18 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok) return NULL; } + case T_CreateExtensionStmt: + { + return CreateExtensionStmtObjectAddress(castNode(CreateExtensionStmt, + parseTree), missing_ok); + } + + case T_AlterExtensionStmt: + { + return AlterExtensionStmtObjectAddress(castNode(AlterExtensionStmt, + parseTree), missing_ok); + } + default: { /* @@ -189,6 +208,11 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok) return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok); } + case OBJECT_EXTENSION: + { + return AlterExtensionSchemaStmtObjectAddress(stmt, missing_ok); + } + default: { ereport(ERROR, (errmsg("unsupported alter schema statement to get object " @@ -273,3 +297,67 @@ AlterObjectDependsStmtObjectAddress(AlterObjectDependsStmt *stmt, bool missing_o } } } + + +/* + * CreateExtensionStmtObjectAddress finds the ObjectAddress for the extension described + * by the CreateExtensionStmt. If missing_ok is false, then this function throws an + * error if the extension does not exist. + * + * Never returns NULL, but the objid in the address could be invalid if missing_ok was set + * to true. + */ +static const ObjectAddress * +CreateExtensionStmtObjectAddress(CreateExtensionStmt *createExtensionStmt, bool + missing_ok) +{ + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + + const char *extensionName = createExtensionStmt->extname; + + Oid extensionoid = get_extension_oid(extensionName, missing_ok); + + /* if we couldn't find the extension, error if missing_ok is false */ + if (!missing_ok && extensionoid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("extension \"%s\" does not exist", + extensionName))); + } + + ObjectAddressSet(*address, ExtensionRelationId, extensionoid); + + return address; +} + + +/* + * AlterExtensionStmtObjectAddress finds the ObjectAddress for the extension described + * by the AlterExtensionStmt. If missing_ok is false, then this function throws an + * error if the extension is not created before. + * + * Never returns NULL, but the objid in the address could be invalid if missing_ok was set + * to true. + */ +static const ObjectAddress * +AlterExtensionStmtObjectAddress(AlterExtensionStmt *alterExtensionStmt, bool + missing_ok) +{ + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + + const char *extensionName = alterExtensionStmt->extname; + + Oid extensionoid = get_extension_oid(extensionName, missing_ok); + + /* if we couldn't find the extension, error if missing_ok is false */ + if (!missing_ok && extensionoid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("extension \"%s\" does not exist", + extensionName))); + } + + ObjectAddressSet(*address, ExtensionRelationId, extensionoid); + + return address; +} diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 8e68554e4..bae589c63 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -403,6 +403,11 @@ SupportedDependencyByCitus(const ObjectAddress *address) return true; } + case OCLASS_EXTENSION: + { + return true; + } + case OCLASS_TYPE: { switch (get_typtype(address->objectId)) diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 53b8c4a0b..ae1bbbaec 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -23,12 +23,30 @@ /* cluster.c - forward declarations */ extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand); + /* call.c */ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); + /* extension.c - forward declarations */ -extern bool IsCitusExtensionStmt(Node *parsetree); +extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree); extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree); +extern List * PlanCreateExtensionStmt(CreateExtensionStmt *stmt, const char *queryString); +extern void ProcessCreateExtensionStmt(CreateExtensionStmt *stmt, const + char *queryString); +extern List * PlanDropExtensionStmt(DropStmt *stmt, const char *queryString); +extern List * PlanAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, + const char *queryString); +extern void ProcessAlterExtensionSchemaStmt(AlterObjectSchemaStmt *alterExtensionStmt, + const char *queryString); +extern List * PlanAlterExtensionUpdateStmt(AlterExtensionStmt *alterExtensionStmt, const + char *queryString); +extern List * CreateExtensionDDLCommand(const ObjectAddress *extensionAddress); +extern const ObjectAddress * AlterExtensionSchemaStmtObjectAddress( + AlterObjectSchemaStmt *stmt, bool missing_ok); +extern const ObjectAddress * AlterExtensionUpdateStmtObjectAddress( + AlterExtensionStmt *alterExtensionStmt, + bool missing_ok); /* foreign_constraint.c - forward declarations */ diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index a659432f3..9dc4ab896 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -32,6 +32,7 @@ extern void AssertObjectTypeIsFunctional(ObjectType type); extern void QualifyTreeNode(Node *stmt); extern const char * DeparseTreeNode(Node *stmt); +/* forward declarations for deparse_type_stmts.c */ extern const char * DeparseCompositeTypeStmt(CompositeTypeStmt *stmt); extern const char * DeparseCreateEnumStmt(CreateEnumStmt *stmt); extern const char * DeparseDropTypeStmt(DropStmt *stmt); @@ -69,8 +70,16 @@ extern void QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt); extern void QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt); extern void QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt); - /* forward declarations for deparse_role_stmts.c */ extern const char * DeparseAlterRoleStmt(AlterRoleStmt *stmt); +/* forward declarations for deparse_extension_stmts.c */ +extern Value * GetExtensionOption(List *extensionOptions, const + char *defname); +extern const char * DeparseCreateExtensionStmt(CreateExtensionStmt *stmt); +extern const char * DeparseDropExtensionStmt(DropStmt *stmt); +extern const char * DeparseAlterExtensionSchemaStmt( + AlterObjectSchemaStmt *alterExtensionSchemaStmt); +extern const char * DeparseAlterExtensionStmt(AlterExtensionStmt *alterExtensionStmt); + #endif /* CITUS_DEPARSER_H */ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index b753c4ab3..3ffdf2b28 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -289 288 f +340 339 f transactionnumberwaitingtransactionnumbers -288 -289 288 +339 +340 339 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -293 292 f -294 292 f -294 293 t +344 343 f +345 343 f +345 344 t transactionnumberwaitingtransactionnumbers -292 -293 292 -294 292,293 +343 +344 343 +345 343,344 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_extension_commands.out b/src/test/regress/expected/isolation_extension_commands.out new file mode 100644 index 000000000..238b4ac4b --- /dev/null +++ b/src/test/regress/expected/isolation_extension_commands.out @@ -0,0 +1,709 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-add-node-1 s2-create-extension-version-11 s1-commit s1-print +?column? + +1 +step s1-begin: + BEGIN; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-create-extension-version-11: + CREATE extension seg VERSION "1.1"; + +step s1-commit: + COMMIT; + +step s2-create-extension-version-11: <... completed> +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +1 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.1 public +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,seg) +run_command_on_workers + +(localhost,57637,t,1.1) +(localhost,57638,t,1.1) +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,public) +master_remove_node + + + + +starting permutation: s1-begin s1-add-node-1 s2-alter-extension-update-to-version-12 s1-commit s1-print +?column? + +1 +step s1-begin: + BEGIN; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-alter-extension-update-to-version-12: + ALTER extension seg update to "1.2"; + +step s1-commit: + COMMIT; + +step s2-alter-extension-update-to-version-12: <... completed> +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +1 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.2 public +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,seg) +run_command_on_workers + +(localhost,57637,t,1.2) +(localhost,57638,t,1.2) +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,public) +master_remove_node + + + + +starting permutation: s1-add-node-1 s1-begin s1-remove-node-1 s2-drop-extension s1-commit s1-print +?column? + +1 +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s1-begin: + BEGIN; + +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +?column? + +1 +step s2-drop-extension: + drop extension seg; + +step s1-commit: + COMMIT; + +step s2-drop-extension: <... completed> +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + + +starting permutation: s1-begin s1-add-node-1 s2-create-extension-with-schema1 s1-commit s1-print +?column? + +1 +step s1-begin: + BEGIN; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-create-extension-with-schema1: + create schema if not exists schema1; + CREATE extension seg with schema schema1; + +step s1-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.3 schema1 +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,1.2) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,"") +master_remove_node + + + + +starting permutation: s1-begin s1-add-node-1 s2-drop-extension-cascade s1-commit s1-print +?column? + +1 +step s1-begin: + BEGIN; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-drop-extension-cascade: + drop extension seg cascade; + +step s1-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,1.2) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,"") +master_remove_node + + + + +starting permutation: s1-add-node-1 s1-create-extension-with-schema2 s1-begin s1-remove-node-1 s2-alter-extension-set-schema4 s1-commit s1-print +?column? + +1 +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s1-create-extension-with-schema2: + create schema if not exists schema2; + CREATE extension seg with schema schema2; + +step s1-begin: + BEGIN; + +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +?column? + +1 +step s2-alter-extension-set-schema4: + CREATE schema if not exists schema4; + alter extension seg set schema schema4; + +step s1-commit: + COMMIT; + +step s2-alter-extension-set-schema4: <... completed> +error in steps s1-commit s2-alter-extension-set-schema4: ERROR: extension "seg" does not exist +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.3 schema2 +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + + +starting permutation: s1-add-node-1 s1-begin s1-remove-node-1 s2-create-extension-with-schema1 s1-commit s1-print +?column? + +1 +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s1-begin: + BEGIN; + +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +?column? + +1 +step s2-create-extension-with-schema1: + create schema if not exists schema1; + CREATE extension seg with schema schema1; + +ERROR: extension "seg" already exists +step s1-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.3 schema2 +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + + +starting permutation: s2-drop-extension s2-begin s2-create-extension-version-11 s1-add-node-1 s2-commit s1-print +?column? + +1 +step s2-drop-extension: + drop extension seg; + +step s2-begin: + BEGIN; + +step s2-create-extension-version-11: + CREATE extension seg VERSION "1.1"; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.1 public +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,1.2) +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,"") +master_remove_node + + + + +starting permutation: s2-drop-extension s2-create-extension-version-11 s2-begin s2-alter-extension-update-to-version-12 s1-add-node-1 s2-commit s1-print +?column? + +1 +step s2-drop-extension: + drop extension seg; + +step s2-create-extension-version-11: + CREATE extension seg VERSION "1.1"; + +step s2-begin: + BEGIN; + +step s2-alter-extension-update-to-version-12: + ALTER extension seg update to "1.2"; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +step s2-commit: + COMMIT; + +step s1-add-node-1: <... completed> +?column? + +1 +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +1 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.2 public +run_command_on_workers + +(localhost,57637,t,seg) +(localhost,57638,t,seg) +run_command_on_workers + +(localhost,57637,t,1.2) +(localhost,57638,t,1.2) +run_command_on_workers + +(localhost,57637,t,public) +(localhost,57638,t,public) +master_remove_node + + + + +starting permutation: s2-add-node-1 s2-begin s2-drop-extension s1-remove-node-1 s2-commit s1-print +?column? + +1 +step s2-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-begin: + BEGIN; + +step s2-drop-extension: + drop extension seg; + +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +step s2-commit: + COMMIT; + +step s1-remove-node-1: <... completed> +?column? + +1 +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + + +starting permutation: s2-begin s2-create-extension-with-schema1 s1-add-node-1 s2-commit s1-print +?column? + +1 +step s2-begin: + BEGIN; + +step s2-create-extension-with-schema1: + create schema if not exists schema1; + CREATE extension seg with schema schema1; + +step s1-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.3 schema1 +run_command_on_workers + +(localhost,57637,t,"") +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,"") +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57637,t,"") +(localhost,57638,t,"") +master_remove_node + + + + +starting permutation: s2-add-node-1 s2-drop-extension-cascade s2-create-extension-with-schema2 s2-begin s2-alter-extension-version-13 s1-remove-node-1 s2-commit s1-print +?column? + +1 +step s2-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-drop-extension-cascade: + drop extension seg cascade; + +step s2-create-extension-with-schema2: + create schema if not exists schema2; + CREATE extension seg with schema schema2; + +step s2-begin: + BEGIN; + +step s2-alter-extension-version-13: + ALTER extension seg update to "1.3"; + +ERROR: extension "seg" does not exist +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +?column? + +1 +step s2-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.3 schema2 +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + + +starting permutation: s2-drop-extension s2-add-node-1 s2-begin s2-create-extension-version-11 s1-remove-node-1 s2-commit s1-print +?column? + +1 +step s2-drop-extension: + drop extension seg; + +step s2-add-node-1: + SELECT 1 FROM master_add_node('localhost', 57637); + +?column? + +1 +step s2-begin: + BEGIN; + +step s2-create-extension-version-11: + CREATE extension seg VERSION "1.1"; + +step s1-remove-node-1: + SELECT 1 FROM master_remove_node('localhost', 57637); + +?column? + +1 +step s2-commit: + COMMIT; + +step s1-print: + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); + +count + +0 +extname extversion nspname + +citus 9.1-1 pg_catalog +plpgsql 1.0 pg_catalog +seg 1.1 public +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +run_command_on_workers + +(localhost,57638,t,"") +master_remove_node + + diff --git a/src/test/regress/expected/propagate_extension_commands.out b/src/test/regress/expected/propagate_extension_commands.out new file mode 100644 index 000000000..c5a5c75e2 --- /dev/null +++ b/src/test/regress/expected/propagate_extension_commands.out @@ -0,0 +1,392 @@ +CREATE SCHEMA "extension'test"; +-- use a schema name with escape character +SET search_path TO "extension'test"; +-- create an extension on the given search_path +-- the extension is on contrib, so should be avaliable for the regression tests +CREATE EXTENSION seg; +-- make sure that both the schema and the extension is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg'); + count +------- + 1 +(1 row) + +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test'); + count +------- + 1 +(1 row) + +CREATE TABLE test_table (key int, value seg); +SELECT create_distributed_table('test_table', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- make sure that the table is also distributed now +SELECT count(*) from pg_dist_partition where logicalrelid='extension''test.test_table'::regclass; + count +------- + 1 +(1 row) + +CREATE TYPE two_segs AS (seg_1 seg, seg_2 seg); +-- verify that the type that depends on the extension is also marked as distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_type WHERE typname = 'two_segs' AND typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test')); + count +------- + 1 +(1 row) + +-- now try to run CREATE EXTENSION within a transction block, all should work fine +BEGIN; + CREATE EXTENSION isn WITH SCHEMA public; + -- now, try create a reference table relying on the data types + -- this should not succeed as we do not distribute extension commands within transaction blocks + CREATE TABLE dist_table (key int, value public.issn); + SELECT create_distributed_table('dist_table', 'key'); + create_distributed_table +-------------------------- + +(1 row) + + + -- we can even run queries (sequentially) over the distributed table + SELECT * FROM dist_table; + key | value +-----+------- +(0 rows) + + INSERT INTO dist_table VALUES (1, public.issn('1436-4522')); + INSERT INTO dist_table SELECT * FROM dist_table RETURNING *; + key | value +-----+----------- + 1 | 1436-4522 +(1 row) + +COMMIT; +-- make sure that the extension is distributed even if we run create extension in a transaction block +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); + count +------- + 1 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$); + run_command_on_workers +------------------------ + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +CREATE TABLE ref_table (a public.issn); +-- now, create a reference table relying on the data types +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +-- now, drop the extension, recreate it with an older version and update it to latest version +SET client_min_messages TO WARNING; +DROP EXTENSION isn CASCADE; +CREATE EXTENSION isn WITH VERSION "1.1"; +RESET client_min_messages; +-- before updating the version, ensure the current version +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$); + run_command_on_workers +------------------------- + (localhost,57637,t,1.1) + (localhost,57638,t,1.1) +(2 rows) + +-- now, update to a newer version +ALTER EXTENSION isn UPDATE TO '1.2'; +-- show that ALTER EXTENSION is propagated +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$); + run_command_on_workers +------------------------- + (localhost,57637,t,1.2) + (localhost,57638,t,1.2) +(2 rows) + +-- before changing the schema, ensure the current schmea +SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$); + run_command_on_workers +------------------------------------ + (localhost,57637,t,extension'test) + (localhost,57638,t,extension'test) +(2 rows) + +-- now change the schema +ALTER EXTENSION isn SET SCHEMA public; +-- switch back to public schema as we set extension's schema to public +SET search_path TO public; +-- make sure that the extension is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); + count +------- + 1 +(1 row) + +-- show that the ALTER EXTENSION command is propagated +SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$); + run_command_on_workers +---------------------------- + (localhost,57637,t,public) + (localhost,57638,t,public) +(2 rows) + +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; +-- drop the extension finally +DROP EXTENSION isn CASCADE; +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; +-- now make sure that the reference tables depending on an extension can be succesfully created. +-- we should also ensure that we replicate this reference table (and hence the extension) +-- to new nodes after calling master_activate_node. +-- now, first drop seg and existing objects before next test +SET client_min_messages TO WARNING; +DROP EXTENSION seg CASCADE; +-- but as we have only 2 ports in postgresql tests, let's remove one of the nodes first +-- before remove, first remove the existing relations (due to the other tests) +DROP SCHEMA "extension'test" CASCADE; +RESET client_min_messages; +SELECT 1 from master_remove_node('localhost', :worker_2_port); + ?column? +---------- + 1 +(1 row) + +-- then create the extension +CREATE EXTENSION seg; +-- show that the extension is created on existing worker +SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------ + (localhost,57637,t,1) +(1 row) + +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------- + (localhost,57637,t,1.3) +(1 row) + +-- now create the reference table +CREATE TABLE ref_table_2 (x seg); +SELECT create_reference_table('ref_table_2'); + create_reference_table +------------------------ + +(1 row) + +-- and add the other node +SELECT 1 from master_add_node('localhost', 57638); +NOTICE: Replicating reference table "ref_table_2" to the node localhost:57638 + ?column? +---------- + 1 +(1 row) + +-- show that the extension is created on both existing and new node +SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------ + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------- + (localhost,57637,t,1.3) + (localhost,57638,t,1.3) +(2 rows) + +-- and similarly check for the reference table +select count(*) from pg_dist_partition where partmethod='n' and logicalrelid='ref_table_2'::regclass; + count +------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='ref_table_2'::regclass; + count +------- + 1 +(1 row) + +DROP TABLE ref_table_2; +-- now test create extension in another transaction block but rollback this time +BEGIN; + CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public; +ROLLBACK; +-- at the end of the transaction block, we did not create isn extension in coordinator or worker nodes as we rollback'ed +-- make sure that the extension is not distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); + count +------- + 0 +(1 row) + +-- and the extension does not exist on workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$); + run_command_on_workers +------------------------ + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +-- give a notice for the following commands saying that it is not +-- propagated to the workers. the user should run it manually on the workers +CREATE TABLE t1 (A int); +CREATE VIEW v1 AS select * from t1; +ALTER EXTENSION seg ADD VIEW v1; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +ALTER EXTENSION seg DROP VIEW v1; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +DROP VIEW v1; +DROP TABLE t1; +-- drop multiple extensions at the same time +CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public; +-- let's create another extension locally +set citus.enable_ddl_propagation to 'off'; +CREATE EXTENSION pg_buffercache; +set citus.enable_ddl_propagation to 'on'; +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; +DROP EXTENSION pg_buffercache, isn CASCADE; +SELECT count(*) FROM pg_extension WHERE extname IN ('pg_buffercache', 'isn'); + count +------- + 0 +(1 row) + +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; +-- drop extension should just work +DROP EXTENSION seg CASCADE; +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg'); + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------ + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; +-- make sure that the extension is not avaliable anymore as a distributed object +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn')); + count +------- + 0 +(1 row) + +CREATE SCHEMA "extension'test"; +SET search_path TO "extension'test"; +-- check restriction for sequential execution +-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off +BEGIN; + CREATE TABLE some_random_table (a int); + SELECT create_distributed_table('some_random_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + CREATE EXTENSION seg; + CREATE TABLE some_random_table_2 (a int, b seg); + SELECT create_distributed_table('some_random_table_2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +ROLLBACK; +-- show that the CREATE EXTENSION command propagated even if the transaction +-- block is rollbacked, that's a shortcoming of dependency creation logic +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + run_command_on_workers +------------------------- + (localhost,57637,t,1.3) + (localhost,57638,t,1.3) +(2 rows) + +-- drop the schema and all the objects +SET client_min_messages TO WARNING; +DROP SCHEMA "extension'test" CASCADE; +-- recreate for the next tests +CREATE SCHEMA "extension'test"; +-- use a schema name with escape character +SET search_path TO "extension'test"; +RESET client_min_messages; +-- remove the node, we'll add back again +SELECT 1 from master_remove_node('localhost', :worker_2_port); + ?column? +---------- + 1 +(1 row) + +-- now, create a type that depends on another type, which +-- finally depends on an extension +BEGIN; + SET citus.shard_replication_factor TO 1; + CREATE EXTENSION seg; + CREATE EXTENSION isn; + CREATE TYPE test_type AS (a int, b seg); + CREATE TYPE test_type_2 AS (a int, b test_type); + CREATE TABLE t2 (a int, b test_type_2, c issn); + SELECT create_distributed_table('t2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + + CREATE TYPE test_type_3 AS (a int, b test_type, c issn); + CREATE TABLE t3 (a int, b test_type_3); + SELECT create_reference_table('t3'); + create_reference_table +------------------------ + +(1 row) + +COMMIT; +-- add the node back +SELECT 1 from master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "t3" to the node localhost:57638 + ?column? +---------- + 1 +(1 row) + +-- make sure that both extensions are created on both nodes +SELECT count(*) FROM citus.pg_dist_object WHERE objid IN (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn')); + count +------- + 2 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname IN ('seg', 'isn')$$); + run_command_on_workers +------------------------ + (localhost,57637,t,2) + (localhost,57638,t,2) +(2 rows) + +-- drop the schema and all the objects +SET client_min_messages TO WARNING; +DROP SCHEMA "extension'test" CASCADE; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 58d39b259..780b3647c 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -7,6 +7,8 @@ test: isolation_master_update_node test: isolation_ensure_dependency_activate_node test: isolation_shouldhaveshards +test: isolation_extension_commands + # tests that change node metadata should precede # isolation_cluster_management such that tests # that come later can be parallelized @@ -19,7 +21,6 @@ test: isolation_distributed_transaction_id test: isolation_dump_global_wait_edges test: isolation_citus_dist_activity - test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement test: isolation_concurrent_dml isolation_data_migration diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1e484c89a..91699fd7e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -14,11 +14,16 @@ # --- # Tests around schema changes, these are run first, so there's no preexisting objects. +# +# propagate_extension_commands lies just after multi_cluster_management as we do +# remove / add node operations, we do not want any preexisting objects before +# propagate_extension_commands # --- test: multi_extension test: multi_703_upgrade test: multi_cluster_management test: alter_role_propagation +test: propagate_extension_commands test: multi_test_helpers test: multi_table_ddl test: multi_name_lengths diff --git a/src/test/regress/specs/isolation_extension_commands.spec b/src/test/regress/specs/isolation_extension_commands.spec new file mode 100644 index 000000000..f42633703 --- /dev/null +++ b/src/test/regress/specs/isolation_extension_commands.spec @@ -0,0 +1,154 @@ +setup +{ + SELECT 1 FROM master_add_node('localhost', 57638); +} + +teardown +{ + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-add-node-1" +{ + SELECT 1 FROM master_add_node('localhost', 57637); +} + +step "s1-activate-node-1" +{ + SELECT 1 FROM master_activate_node('localhost', 57637); +} + +step "s1-disable-node-1" +{ + SELECT 1 FROM master_disable_node('localhost', 57637); +} + +step "s1-remove-node-1" +{ + SELECT 1 FROM master_remove_node('localhost', 57637); +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-alter-extension-12" +{ + alter extension seg update to "1.2"; +} + +step "s1-drop-extension-cascade" +{ + drop extension seg cascade; +} + +step "s1-create-extension-with-schema2" +{ + create schema if not exists schema2; + CREATE extension seg with schema schema2; +} + +step "s1-alter-extension-set-schema2" +{ + create schema if not exists schema2; + alter extension seg set schema schema2; +} + +step "s1-print" +{ + select count(*) from citus.pg_dist_object ; + select extname, extversion, nspname from pg_extension, pg_namespace where pg_namespace.oid=pg_extension.extnamespace order by 1,2,3; + SELECT run_command_on_workers($$select extname from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select extversion from pg_extension where extname='seg'$$); + SELECT run_command_on_workers($$select nspname from pg_extension, pg_namespace where extname='seg' and pg_extension.extnamespace=pg_namespace.oid$$); +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-add-node-1" +{ + SELECT 1 FROM master_add_node('localhost', 57637); +} + +step "s2-create-extension-version-11" +{ + CREATE extension seg VERSION "1.1"; +} + +step "s2-alter-extension-version-13" +{ + ALTER extension seg update to "1.3"; +} + +step "s2-create-extension-with-schema1" +{ + create schema if not exists schema1; + CREATE extension seg with schema schema1; +} + +step "s2-create-extension-with-schema2" +{ + create schema if not exists schema2; + CREATE extension seg with schema schema2; +} + +step "s2-drop-extension" +{ + drop extension seg; +} + +step "s2-drop-extension-cascade" +{ + drop extension seg cascade; +} + +step "s2-alter-extension-update-to-version-12" +{ + ALTER extension seg update to "1.2"; +} + +step "s2-alter-extension-set-schema4" +{ + CREATE schema if not exists schema4; + alter extension seg set schema schema4; +} + +step "s2-alter-extension-set-schema-public" +{ + alter extension seg set schema public; +} + +step "s2-commit" +{ + COMMIT; +} + +# master_#_node vs extension command +permutation "s1-begin" "s1-add-node-1" "s2-create-extension-version-11" "s1-commit" "s1-print" +permutation "s1-begin" "s1-add-node-1" "s2-alter-extension-update-to-version-12" "s1-commit" "s1-print" +permutation "s1-add-node-1" "s1-begin" "s1-remove-node-1" "s2-drop-extension" "s1-commit" "s1-print" +permutation "s1-begin" "s1-add-node-1" "s2-create-extension-with-schema1" "s1-commit" "s1-print" +permutation "s1-begin" "s1-add-node-1" "s2-drop-extension-cascade" "s1-commit" "s1-print" +permutation "s1-add-node-1" "s1-create-extension-with-schema2" "s1-begin" "s1-remove-node-1" "s2-alter-extension-set-schema4" "s1-commit" "s1-print" +permutation "s1-add-node-1" "s1-begin" "s1-remove-node-1" "s2-create-extension-with-schema1" "s1-commit" "s1-print" + +# extension command vs master_#_node +permutation "s2-drop-extension" "s2-begin" "s2-create-extension-version-11" "s1-add-node-1" "s2-commit" "s1-print" +permutation "s2-drop-extension" "s2-create-extension-version-11" "s2-begin" "s2-alter-extension-update-to-version-12" "s1-add-node-1" "s2-commit" "s1-print" +permutation "s2-add-node-1" "s2-begin" "s2-drop-extension" "s1-remove-node-1" "s2-commit" "s1-print" +permutation "s2-begin" "s2-create-extension-with-schema1" "s1-add-node-1" "s2-commit" "s1-print" +permutation "s2-add-node-1" "s2-drop-extension-cascade" "s2-create-extension-with-schema2" "s2-begin" "s2-alter-extension-version-13" "s1-remove-node-1" "s2-commit" "s1-print" +permutation "s2-drop-extension" "s2-add-node-1" "s2-begin" "s2-create-extension-version-11" "s1-remove-node-1" "s2-commit" "s1-print" diff --git a/src/test/regress/sql/propagate_extension_commands.sql b/src/test/regress/sql/propagate_extension_commands.sql new file mode 100644 index 000000000..f8a29fb83 --- /dev/null +++ b/src/test/regress/sql/propagate_extension_commands.sql @@ -0,0 +1,234 @@ +CREATE SCHEMA "extension'test"; + +-- use a schema name with escape character +SET search_path TO "extension'test"; + +-- create an extension on the given search_path +-- the extension is on contrib, so should be avaliable for the regression tests +CREATE EXTENSION seg; + +-- make sure that both the schema and the extension is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg'); +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test'); + +CREATE TABLE test_table (key int, value seg); +SELECT create_distributed_table('test_table', 'key'); + +-- make sure that the table is also distributed now +SELECT count(*) from pg_dist_partition where logicalrelid='extension''test.test_table'::regclass; + +CREATE TYPE two_segs AS (seg_1 seg, seg_2 seg); + +-- verify that the type that depends on the extension is also marked as distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_type WHERE typname = 'two_segs' AND typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'extension''test')); + +-- now try to run CREATE EXTENSION within a transction block, all should work fine +BEGIN; + CREATE EXTENSION isn WITH SCHEMA public; + + -- now, try create a reference table relying on the data types + -- this should not succeed as we do not distribute extension commands within transaction blocks + CREATE TABLE dist_table (key int, value public.issn); + SELECT create_distributed_table('dist_table', 'key'); + + -- we can even run queries (sequentially) over the distributed table + SELECT * FROM dist_table; + INSERT INTO dist_table VALUES (1, public.issn('1436-4522')); + INSERT INTO dist_table SELECT * FROM dist_table RETURNING *; +COMMIT; + +-- make sure that the extension is distributed even if we run create extension in a transaction block +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$); + + + +CREATE TABLE ref_table (a public.issn); +-- now, create a reference table relying on the data types +SELECT create_reference_table('ref_table'); + +-- now, drop the extension, recreate it with an older version and update it to latest version +SET client_min_messages TO WARNING; +DROP EXTENSION isn CASCADE; +CREATE EXTENSION isn WITH VERSION "1.1"; +RESET client_min_messages; + +-- before updating the version, ensure the current version +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$); + +-- now, update to a newer version +ALTER EXTENSION isn UPDATE TO '1.2'; + +-- show that ALTER EXTENSION is propagated +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'isn'$$); + +-- before changing the schema, ensure the current schmea +SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$); + +-- now change the schema +ALTER EXTENSION isn SET SCHEMA public; + +-- switch back to public schema as we set extension's schema to public +SET search_path TO public; + +-- make sure that the extension is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); + +-- show that the ALTER EXTENSION command is propagated +SELECT run_command_on_workers($$SELECT nspname from pg_namespace where oid=(SELECT extnamespace FROM pg_extension WHERE extname = 'isn')$$); + +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; +-- drop the extension finally +DROP EXTENSION isn CASCADE; +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; + +-- now make sure that the reference tables depending on an extension can be succesfully created. +-- we should also ensure that we replicate this reference table (and hence the extension) +-- to new nodes after calling master_activate_node. + +-- now, first drop seg and existing objects before next test +SET client_min_messages TO WARNING; +DROP EXTENSION seg CASCADE; + +-- but as we have only 2 ports in postgresql tests, let's remove one of the nodes first +-- before remove, first remove the existing relations (due to the other tests) + +DROP SCHEMA "extension'test" CASCADE; +RESET client_min_messages; +SELECT 1 from master_remove_node('localhost', :worker_2_port); + +-- then create the extension +CREATE EXTENSION seg; + +-- show that the extension is created on existing worker +SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$); +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + +-- now create the reference table +CREATE TABLE ref_table_2 (x seg); +SELECT create_reference_table('ref_table_2'); + +-- and add the other node +SELECT 1 from master_add_node('localhost', 57638); + +-- show that the extension is created on both existing and new node +SELECT run_command_on_workers($$SELECT count(extnamespace) FROM pg_extension WHERE extname = 'seg'$$); +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + +-- and similarly check for the reference table +select count(*) from pg_dist_partition where partmethod='n' and logicalrelid='ref_table_2'::regclass; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='ref_table_2'::regclass; +DROP TABLE ref_table_2; +-- now test create extension in another transaction block but rollback this time +BEGIN; + CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public; +ROLLBACK; + +-- at the end of the transaction block, we did not create isn extension in coordinator or worker nodes as we rollback'ed +-- make sure that the extension is not distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'isn'); + +-- and the extension does not exist on workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'isn'$$); + +-- give a notice for the following commands saying that it is not +-- propagated to the workers. the user should run it manually on the workers +CREATE TABLE t1 (A int); +CREATE VIEW v1 AS select * from t1; + +ALTER EXTENSION seg ADD VIEW v1; +ALTER EXTENSION seg DROP VIEW v1; +DROP VIEW v1; +DROP TABLE t1; + +-- drop multiple extensions at the same time +CREATE EXTENSION isn WITH VERSION '1.1' SCHEMA public; +-- let's create another extension locally +set citus.enable_ddl_propagation to 'off'; +CREATE EXTENSION pg_buffercache; +set citus.enable_ddl_propagation to 'on'; + +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; +DROP EXTENSION pg_buffercache, isn CASCADE; +SELECT count(*) FROM pg_extension WHERE extname IN ('pg_buffercache', 'isn'); +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; + +-- SET client_min_messages TO WARNING before executing a DROP EXTENSION statement +SET client_min_messages TO WARNING; + +-- drop extension should just work +DROP EXTENSION seg CASCADE; + +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname = 'seg'); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname = 'seg'$$); + +-- restore client_min_messages after DROP EXTENSION +RESET client_min_messages; + +-- make sure that the extension is not avaliable anymore as a distributed object +SELECT count(*) FROM citus.pg_dist_object WHERE objid = (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn')); + +CREATE SCHEMA "extension'test"; +SET search_path TO "extension'test"; +-- check restriction for sequential execution +-- enable it and see that create command errors but continues its execution by changing citus.multi_shard_modify_mode TO 'off + +BEGIN; + CREATE TABLE some_random_table (a int); + SELECT create_distributed_table('some_random_table', 'a'); + CREATE EXTENSION seg; + CREATE TABLE some_random_table_2 (a int, b seg); + SELECT create_distributed_table('some_random_table_2', 'a'); +ROLLBACK; + +-- show that the CREATE EXTENSION command propagated even if the transaction +-- block is rollbacked, that's a shortcoming of dependency creation logic +SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extname = 'seg'$$); + +-- drop the schema and all the objects +SET client_min_messages TO WARNING; +DROP SCHEMA "extension'test" CASCADE; + +-- recreate for the next tests +CREATE SCHEMA "extension'test"; + +-- use a schema name with escape character +SET search_path TO "extension'test"; + +RESET client_min_messages; + +-- remove the node, we'll add back again +SELECT 1 from master_remove_node('localhost', :worker_2_port); + +-- now, create a type that depends on another type, which +-- finally depends on an extension +BEGIN; + SET citus.shard_replication_factor TO 1; + CREATE EXTENSION seg; + CREATE EXTENSION isn; + CREATE TYPE test_type AS (a int, b seg); + CREATE TYPE test_type_2 AS (a int, b test_type); + + CREATE TABLE t2 (a int, b test_type_2, c issn); + SELECT create_distributed_table('t2', 'a'); + + CREATE TYPE test_type_3 AS (a int, b test_type, c issn); + CREATE TABLE t3 (a int, b test_type_3); + SELECT create_reference_table('t3'); + +COMMIT; + +-- add the node back +SELECT 1 from master_add_node('localhost', :worker_2_port); + +-- make sure that both extensions are created on both nodes +SELECT count(*) FROM citus.pg_dist_object WHERE objid IN (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn')); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname IN ('seg', 'isn')$$); + +-- drop the schema and all the objects +SET client_min_messages TO WARNING; +DROP SCHEMA "extension'test" CASCADE;