diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index d8c3f0048..1bff7c295 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -24,7 +24,11 @@ #include "utils/lsyscache.h" static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); +static List * FilterObjectAddressListByPredicate(List *objectAddressList, + bool (*predicate)(const + ObjectAddress *)); +bool EnableDependencyCreation = true; /* * EnsureDependenciesExists finds all the dependencies that we support and makes sure @@ -207,6 +211,14 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) */ dependencies = GetDistributedObjectAddressList(); + /* + * Depending on changes in the environment, such as the enable_object_propagation guc + * there might be objects in the distributed object address list that should currently + * not be propagated by citus as the are 'not supported'. + */ + dependencies = FilterObjectAddressListByPredicate(dependencies, + &SupportedDependencyByCitus); + /* * When dependency lists are getting longer we see a delay in the creation time on the * workers. We would like to inform the user. Currently we warn for lists greater then @@ -241,3 +253,27 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); } + + +/* + * FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list + * only containing the ObjectAddress *'s for which the predicate returned true. + */ +static List * +FilterObjectAddressListByPredicate(List *objectAddressList, + bool (*predicate)(const ObjectAddress *)) +{ + List *result = NIL; + ListCell *objectAddressListCell = NULL; + + foreach(objectAddressListCell, objectAddressList) + { + ObjectAddress *address = (ObjectAddress *) lfirst(objectAddressListCell); + if (predicate(address)) + { + result = lappend(result, address); + } + } + + return result; +} diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index f4823031a..86d32979b 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -99,6 +99,7 @@ static CreateEnumStmt * RecreateEnumStmt(Oid typeOid); static List * EnumValsList(Oid typeOid); static bool ShouldPropagateTypeCreate(void); +static bool ShouldPropagateAlterType(const ObjectAddress *address); /* @@ -209,20 +210,8 @@ PlanAlterTypeStmt(AlterTableStmt *stmt, const char *queryString) Assert(stmt->relkind == OBJECT_TYPE); - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefor not be propagated. - */ - return NIL; - } - - /* - * Only distributed types should be propagated - */ typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { return NIL; } @@ -343,17 +332,8 @@ PlanAlterEnumStmt(AlterEnumStmt *stmt, const char *queryString) const ObjectAddress *typeAddress = NULL; List *commands = NIL; - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefor not be propagated here. - */ - return NIL; - } - typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { return NIL; } @@ -414,17 +394,8 @@ ProcessAlterEnumStmt(AlterEnumStmt *stmt, const char *queryString) { const ObjectAddress *typeAddress = NULL; - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefor not be propagated here. - */ - return; - } - typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { return; } @@ -497,6 +468,14 @@ PlanDropTypeStmt(DropStmt *stmt, const char *queryString) return NIL; } + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return NIL; + } + distributedTypes = FilterNameListForDistributedTypes(oldTypes, stmt->missing_ok); if (list_length(distributedTypes) <= 0) { @@ -556,25 +535,11 @@ PlanRenameTypeStmt(RenameStmt *stmt, const char *queryString) List *commands = NIL; typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { return NIL; } - /* - * we should not get to a point where an alter happens on a distributed type during an - * extension statement, but better safe then sorry. - */ - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefor not be propagated here. - */ - return NIL; - } - - /* fully qualify */ QualifyTreeNode((Node *) stmt); @@ -603,31 +568,18 @@ List * PlanRenameTypeAttributeStmt(RenameStmt *stmt, const char *queryString) { const char *sql = NULL; - const ObjectAddress *address = NULL; + const ObjectAddress *typeAddress = NULL; List *commands = NIL; Assert(stmt->renameType == OBJECT_ATTRIBUTE); Assert(stmt->relationType == OBJECT_TYPE); - address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(address)) + typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterType(typeAddress)) { return NIL; } - /* - * we should not get to a point where an alter happens on a distributed type during an - * extension statement, but better safe then sorry. - */ - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefor not be propagated here. - */ - return NIL; - } - QualifyTreeNode((Node *) stmt); sql = DeparseTreeNode((Node *) stmt); @@ -656,16 +608,9 @@ PlanAlterTypeSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) Assert(stmt->objectType == OBJECT_TYPE); - if (creating_extension) - { - /* types from extensions are managed by extensions, skipping */ - return NIL; - } - typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { - /* not distributed to the workers, nothing to do */ return NIL; } @@ -696,16 +641,9 @@ ProcessAlterTypeSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) Assert(stmt->objectType == OBJECT_TYPE); - if (creating_extension) - { - /* types from extensions are managed by extensions, skipping */ - return; - } - typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { - /* not distributed to the workers, nothing to do */ return; } @@ -730,14 +668,8 @@ PlanAlterTypeOwnerStmt(AlterOwnerStmt *stmt, const char *queryString) Assert(stmt->objectType == OBJECT_TYPE); - if (creating_extension) - { - /* types from extensions are managed by extensions, skipping */ - return NIL; - } - typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!IsObjectDistributed(typeAddress)) + if (!ShouldPropagateAlterType(typeAddress)) { return NIL; } @@ -1430,6 +1362,14 @@ EnsureSequentialModeForTypeDDL(void) static bool ShouldPropagateTypeCreate() { + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return false; + } + if (!EnableCreateTypePropagation) { /* @@ -1459,3 +1399,37 @@ ShouldPropagateTypeCreate() return true; } + + +/* + * ShouldPropagateAlterType determines if we should be propagating type alterations based + * on its object address. + */ +static bool +ShouldPropagateAlterType(const ObjectAddress *address) +{ + if (creating_extension) + { + /* + * extensions should be created separately on the workers, types cascading from an + * extension should therefor not be propagated. + */ + return false; + } + + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return false; + } + + if (!IsObjectDistributed(address)) + { + /* do not propagate alter types for non-distributed types */ + return false; + } + + return true; +} diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 235a6cfc1..8a0d6d69a 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -19,6 +19,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_depend.h" #include "catalog/pg_type.h" +#include "distributed/commands/utility_hook.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "utils/fmgroids.h" @@ -60,7 +61,6 @@ static void ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend); static List * ExpandCitusSupportedTypes(void *context, const ObjectAddress *target); /* forward declaration of support functions to decide what to follow */ -static bool SupportedDependencyByCitus(const ObjectAddress *address); static bool IsObjectAddressOwnedByExtension(const ObjectAddress *target); @@ -302,9 +302,32 @@ IsObjectAddressCollected(const ObjectAddress *findAddress, * SupportedDependencyByCitus returns whether citus has support to distribute the object * addressed. */ -static bool +bool SupportedDependencyByCitus(const ObjectAddress *address) { + if (!EnableDependencyCreation) + { + /* + * If the user has disabled object propagation we need to fall back to the legacy + * behaviour in which we only support schema creation + */ + switch (getObjectClass(address)) + { + case OCLASS_SCHEMA: + { + return true; + } + + default: + { + return false; + } + } + + /* should be unreachable */ + Assert(false); + } + /* * looking at the type of a object to see if we know how to create the object on the * workers. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00948ad10..b71599c1b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -693,6 +693,17 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_object_propagation", + gettext_noop("Enables propagating object creation for more complex objects, " + "schema's will always be created"), + NULL, + &EnableDependencyCreation, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_create_type_propagation", gettext_noop("Enables propagating of CREATE TYPE statements to workers"), diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 59246947d..f34c2228f 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -28,6 +28,7 @@ typedef enum } PropSetCmdBehavior; extern PropSetCmdBehavior PropagateSetCommands; extern bool EnableDDLPropagation; +extern bool EnableDependencyCreation; extern bool EnableCreateTypePropagation; /* diff --git a/src/include/distributed/metadata/dependency.h b/src/include/distributed/metadata/dependency.h index e4cb406d8..c145b744a 100644 --- a/src/include/distributed/metadata/dependency.h +++ b/src/include/distributed/metadata/dependency.h @@ -19,5 +19,6 @@ extern List * GetDependenciesForObject(const ObjectAddress *target); extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList); +extern bool SupportedDependencyByCitus(const ObjectAddress *address); #endif /* CITUS_DEPENDENCY_H */ diff --git a/src/test/regress/expected/disable_object_propagation.out b/src/test/regress/expected/disable_object_propagation.out new file mode 100644 index 000000000..f923795df --- /dev/null +++ b/src/test/regress/expected/disable_object_propagation.out @@ -0,0 +1,126 @@ +SET citus.next_shard_id TO 20030000; +SET citus.enable_object_propagation TO false; -- all tests here verify old behaviour without distributing types,functions,etc automatically +CREATE USER typeowner_for_disabled_object_propagation_guc; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +CREATE SCHEMA disabled_object_propagation; +CREATE SCHEMA disabled_object_propagation2; +SET search_path TO disabled_object_propagation; +-- verify the table gets created, which requires schema distribution to still work +CREATE TABLE t1 (a int PRIMARY KEY , b int); +SELECT create_distributed_table('t1','a'); + create_distributed_table +-------------------------- + +(1 row) + +-- verify types are not created, preventing distributed tables to be created unless created manually on the workers +CREATE TYPE tt1 AS (a int , b int); +CREATE TABLE t2 (a int PRIMARY KEY, b tt1); +SELECT create_distributed_table('t2', 'a'); +ERROR: type "disabled_object_propagation.tt1" does not exist +CONTEXT: while executing command on localhost:57637 +SELECT 1 FROM run_command_on_workers($$ + BEGIN; + SET LOCAL citus.enable_ddl_propagation TO off; + CREATE TYPE disabled_object_propagation.tt1 AS (a int , b int); + COMMIT; +$$); + ?column? +---------- + 1 + 1 +(2 rows) + +SELECT create_distributed_table('t2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- verify enum types are not created, preventing distributed tables to be created unless created manually on the workers +CREATE TYPE tt2 AS ENUM ('a', 'b'); +CREATE TABLE t3 (a int PRIMARY KEY, b tt2); +SELECT create_distributed_table('t3', 'a'); +ERROR: type "disabled_object_propagation.tt2" does not exist +CONTEXT: while executing command on localhost:57637 +SELECT 1 FROM run_command_on_workers($$ + BEGIN; + SET LOCAL citus.enable_ddl_propagation TO off; + CREATE TYPE disabled_object_propagation.tt2 AS ENUM ('a', 'b'); + COMMIT; +$$); + ?column? +---------- + 1 + 1 +(2 rows) + +SELECT create_distributed_table('t3', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- verify ALTER TYPE statements are not propagated for types, even though they are marked distributed +BEGIN; +-- object propagation is turned off after xact finished, type is already marked distributed by then +SET LOCAL citus.enable_object_propagation TO on; +CREATE TYPE tt3 AS (a int, b int); +CREATE TABLE t4 (a int PRIMARY KEY, b tt3); +SELECT create_distributed_table('t4','a'); + create_distributed_table +-------------------------- + +(1 row) + +DROP TABLE t4; -- as long as the table is using the type some operations are hard to force +COMMIT; +-- verify the type is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = 'disabled_object_propagation.tt3'::regtype::oid; + count +------- + 1 +(1 row) + +ALTER TYPE tt3 ADD ATTRIBUTE c int, DROP ATTRIBUTE b, ALTER ATTRIBUTE a SET DATA TYPE text COLLATE "en_US"; +ALTER TYPE tt3 OWNER TO typeowner_for_disabled_object_propagation_guc; +ALTER TYPE tt3 RENAME ATTRIBUTE c TO d; +ALTER TYPE tt3 RENAME TO tt4; +ALTER TYPE tt4 SET SCHEMA disabled_object_propagation2; +DROP TYPE disabled_object_propagation2.tt4; +-- verify no changes have been made to the type on the remote servers. tt3 is used as a name since the rename should not have been propagated +SELECT run_command_on_workers($$ +SELECT row(nspname, typname, usename) + FROM pg_type + JOIN pg_user ON (typowner = usesysid) + JOIN pg_namespace ON (pg_namespace.oid = typnamespace) + WHERE typname = 'tt3'; +$$); + run_command_on_workers +------------------------------------------------------------------ + (localhost,57637,t,"(disabled_object_propagation,tt3,postgres)") + (localhost,57638,t,"(disabled_object_propagation,tt3,postgres)") +(2 rows) + +SELECT run_command_on_workers($$ + SELECT row(pg_type.typname, string_agg(attname || ' ' || atttype.typname, ', ' ORDER BY attnum)) + FROM pg_attribute + JOIN pg_class ON (attrelid = pg_class.oid) + JOIN pg_type ON (pg_class.reltype = pg_type.oid) + JOIN pg_type AS atttype ON (atttypid = atttype.oid) + WHERE pg_type.typname = 'tt3' +GROUP BY pg_type.typname; +$$); + run_command_on_workers +------------------------------------------------ + (localhost,57637,t,"(tt3,""a int4, b int4"")") + (localhost,57638,t,"(tt3,""a int4, b int4"")") +(2 rows) + +-- suppress any warnings during cleanup +SET client_min_messages TO fatal; +RESET citus.enable_object_propagation; +DROP SCHEMA disabled_object_propagation CASCADE; +DROP SCHEMA disabled_object_propagation2 CASCADE; +DROP USER typeowner_for_disabled_object_propagation_guc; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 0e56f0eab..1cb8334b4 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -280,5 +280,5 @@ test: ssl_by_default # --------- # object distribution tests # --------- -test: distributed_types distributed_types_conflict +test: distributed_types distributed_types_conflict disable_object_propagation test: distributed_functions diff --git a/src/test/regress/sql/disable_object_propagation.sql b/src/test/regress/sql/disable_object_propagation.sql new file mode 100644 index 000000000..8f82f89ec --- /dev/null +++ b/src/test/regress/sql/disable_object_propagation.sql @@ -0,0 +1,82 @@ +SET citus.next_shard_id TO 20030000; +SET citus.enable_object_propagation TO false; -- all tests here verify old behaviour without distributing types,functions,etc automatically + +CREATE USER typeowner_for_disabled_object_propagation_guc; +CREATE SCHEMA disabled_object_propagation; +CREATE SCHEMA disabled_object_propagation2; +SET search_path TO disabled_object_propagation; + +-- verify the table gets created, which requires schema distribution to still work +CREATE TABLE t1 (a int PRIMARY KEY , b int); +SELECT create_distributed_table('t1','a'); + +-- verify types are not created, preventing distributed tables to be created unless created manually on the workers +CREATE TYPE tt1 AS (a int , b int); +CREATE TABLE t2 (a int PRIMARY KEY, b tt1); +SELECT create_distributed_table('t2', 'a'); +SELECT 1 FROM run_command_on_workers($$ + BEGIN; + SET LOCAL citus.enable_ddl_propagation TO off; + CREATE TYPE disabled_object_propagation.tt1 AS (a int , b int); + COMMIT; +$$); +SELECT create_distributed_table('t2', 'a'); + +-- verify enum types are not created, preventing distributed tables to be created unless created manually on the workers +CREATE TYPE tt2 AS ENUM ('a', 'b'); +CREATE TABLE t3 (a int PRIMARY KEY, b tt2); +SELECT create_distributed_table('t3', 'a'); +SELECT 1 FROM run_command_on_workers($$ + BEGIN; + SET LOCAL citus.enable_ddl_propagation TO off; + CREATE TYPE disabled_object_propagation.tt2 AS ENUM ('a', 'b'); + COMMIT; +$$); +SELECT create_distributed_table('t3', 'a'); + +-- verify ALTER TYPE statements are not propagated for types, even though they are marked distributed +BEGIN; +-- object propagation is turned off after xact finished, type is already marked distributed by then +SET LOCAL citus.enable_object_propagation TO on; +CREATE TYPE tt3 AS (a int, b int); +CREATE TABLE t4 (a int PRIMARY KEY, b tt3); +SELECT create_distributed_table('t4','a'); +DROP TABLE t4; -- as long as the table is using the type some operations are hard to force +COMMIT; + +-- verify the type is distributed +SELECT count(*) FROM citus.pg_dist_object WHERE objid = 'disabled_object_propagation.tt3'::regtype::oid; + +ALTER TYPE tt3 ADD ATTRIBUTE c int, DROP ATTRIBUTE b, ALTER ATTRIBUTE a SET DATA TYPE text COLLATE "en_US"; +ALTER TYPE tt3 OWNER TO typeowner_for_disabled_object_propagation_guc; +ALTER TYPE tt3 RENAME ATTRIBUTE c TO d; +ALTER TYPE tt3 RENAME TO tt4; +ALTER TYPE tt4 SET SCHEMA disabled_object_propagation2; +DROP TYPE disabled_object_propagation2.tt4; + +-- verify no changes have been made to the type on the remote servers. tt3 is used as a name since the rename should not have been propagated +SELECT run_command_on_workers($$ +SELECT row(nspname, typname, usename) + FROM pg_type + JOIN pg_user ON (typowner = usesysid) + JOIN pg_namespace ON (pg_namespace.oid = typnamespace) + WHERE typname = 'tt3'; +$$); + +SELECT run_command_on_workers($$ + SELECT row(pg_type.typname, string_agg(attname || ' ' || atttype.typname, ', ' ORDER BY attnum)) + FROM pg_attribute + JOIN pg_class ON (attrelid = pg_class.oid) + JOIN pg_type ON (pg_class.reltype = pg_type.oid) + JOIN pg_type AS atttype ON (atttypid = atttype.oid) + WHERE pg_type.typname = 'tt3' +GROUP BY pg_type.typname; +$$); + +-- suppress any warnings during cleanup +SET client_min_messages TO fatal; +RESET citus.enable_object_propagation; +DROP SCHEMA disabled_object_propagation CASCADE; +DROP SCHEMA disabled_object_propagation2 CASCADE; +DROP USER typeowner_for_disabled_object_propagation_guc; +