From 39df51e903510f83fbc9668c8c44b9ee83f99be4 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 7 Feb 2020 18:07:59 +0300 Subject: [PATCH] Introduce objects to dist. infrastructure when updating Citus (#3477) Mark existing objects that are not included in distributed object infrastructure in older versions of Citus (but now should be) as distributed, after updating Citus successfully. --- .../distributed/commands/dependencies.c | 39 +++++++ src/backend/distributed/commands/extension.c | 110 ++++++++++++++++++ .../distributed/commands/utility_hook.c | 29 ++++- src/backend/distributed/metadata/dependency.c | 28 +++++ .../distributed/metadata/metadata_cache.c | 24 +++- src/include/distributed/commands.h | 1 + .../distributed/master_metadata_utility.h | 1 + src/include/distributed/metadata/dependency.h | 1 + src/include/distributed/metadata_cache.h | 1 + .../after_citus_upgrade_coord_schedule | 1 + .../before_citus_upgrade_coord_schedule | 1 + .../upgrade_pg_dist_object_test_after.out | 33 ++++++ .../upgrade_pg_dist_object_test_before.out | 74 ++++++++++++ .../sql/upgrade_pg_dist_object_test_after.sql | 18 +++ .../upgrade_pg_dist_object_test_before.sql | 44 +++++++ 15 files changed, 401 insertions(+), 4 deletions(-) create mode 100644 src/test/regress/expected/upgrade_pg_dist_object_test_after.out create mode 100644 src/test/regress/expected/upgrade_pg_dist_object_test_before.out create mode 100644 src/test/regress/sql/upgrade_pg_dist_object_test_after.sql create mode 100644 src/test/regress/sql/upgrade_pg_dist_object_test_before.sql diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index d13105763..d41cf2168 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -135,6 +135,45 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) } +/* + * GetDistributableDependenciesForObject finds all the dependencies that Citus + * can distribute and returns those dependencies regardless of their existency + * on nodes. + */ +List * +GetDistributableDependenciesForObject(const ObjectAddress *target) +{ + /* local variables to work with dependencies */ + List *distributableDependencies = NIL; + ListCell *dependencyCell = NULL; + + /* collect all dependencies in creation order */ + List *dependencies = GetDependenciesForObject(target); + + /* filter the ones that can be distributed */ + foreach(dependencyCell, dependencies) + { + ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell); + + /* + * TODO: maybe we can optimize the logic applied in below line. Actually we + * do not need to create ddl commands as we are not ensuring their existence + * in nodes, but we utilize logic it follows to choose the objects that could + * be distributed + */ + List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); + + /* create a new list with dependencies that actually created commands */ + if (list_length(dependencyCommands) > 0) + { + distributableDependencies = lappend(distributableDependencies, dependency); + } + } + + return distributableDependencies; +} + + /* * GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl * commands to execute on a worker to create the object. diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 5add3f147..85fd0bbe9 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -17,7 +17,9 @@ #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparser.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" +#include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" @@ -32,6 +34,7 @@ static char * ExtractNewExtensionVersion(Node *parseTree); static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); static List * FilterDistributedExtensions(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); +static void MarkExistingObjectDependenciesDistributedIfSupported(void); static void EnsureSequentialModeForExtensionDDL(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsDropCitusStmt(Node *parseTree); @@ -453,6 +456,7 @@ List * PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString) { AlterExtensionStmt *alterExtensionStmt = castNode(AlterExtensionStmt, node); + if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt)) { return NIL; @@ -491,6 +495,112 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString) } +/* + * PostprocessAlterExtensionCitusUpdateStmt is called after ALTER EXTENSION + * citus UPDATE command is executed by standard utility hook. + * + * Actually, we do not need such a post process function for ALTER EXTENSION + * UPDATE commands unless the extension is Citus itself. This is because we + * need to mark existing objects that are not included in distributed object + * infrastructure in older versions of Citus (but now should be) as distributed + * if we really updated Citus to the available version successfully via standard + * utility hook. + */ +void +PostprocessAlterExtensionCitusUpdateStmt(Node *node) +{ + /* + * We should not postprocess this command in workers as they do not keep track + * of citus.pg_dist_object. + */ + if (!IsCoordinator()) + { + return; + } + + bool citusIsUpdatedToLatestVersion = InstalledAndAvailableVersionsSame(); + + /* + * Knowing that Citus version was different than the available version before + * standard process utility runs ALTER EXTENSION command, we perform post + * process operations if Citus is updated to that available version + */ + if (!citusIsUpdatedToLatestVersion) + { + return; + } + + /* + * Finally, mark existing objects that are not included in distributed object + * infrastructure in older versions of Citus (but now should be) as distributed + */ + MarkExistingObjectDependenciesDistributedIfSupported(); +} + + +/* + * MarkAllExistingObjectsDistributed marks all objects that could be distributed by + * resolving dependencies of "existing distributed tables" and "already distributed + * objects" to introduce the objects created in older versions of Citus to distributed + * object infrastructure as well. + * + * Note that this function is not responsible for ensuring if dependencies exist on + * nodes and satisfying these dependendencies if not exists, which is already done by + * EnsureDependenciesExistOnAllNodes on demand. Hence, this function is just designed + * to be used when "ALTER EXTENSION citus UPDATE" is executed. + * This is because we want to add existing objects that would have already been in + * pg_dist_object if we had created them in new version of Citus to pg_dist_object. + */ +static void +MarkExistingObjectDependenciesDistributedIfSupported() +{ + ListCell *listCell = NULL; + + /* resulting object addresses to be marked as distributed */ + List *resultingObjectAddresses = NIL; + + /* resolve dependencies of distributed tables */ + List *distributedTableOidList = DistTableOidList(); + + foreach(listCell, distributedTableOidList) + { + Oid distributedTableOid = lfirst_oid(listCell); + + ObjectAddress tableAddress = { 0 }; + ObjectAddressSet(tableAddress, RelationRelationId, distributedTableOid); + + List *distributableDependencyObjectAddresses = + GetDistributableDependenciesForObject(&tableAddress); + + resultingObjectAddresses = list_concat(resultingObjectAddresses, + distributableDependencyObjectAddresses); + } + + /* resolve dependencies of the objects in pg_dist_object*/ + List *distributedObjectAddressList = GetDistributedObjectAddressList(); + + foreach(listCell, distributedObjectAddressList) + { + ObjectAddress *distributedObjectAddress = (ObjectAddress *) lfirst(listCell); + + List *distributableDependencyObjectAddresses = + GetDistributableDependenciesForObject(distributedObjectAddress); + + resultingObjectAddresses = list_concat(resultingObjectAddresses, + distributableDependencyObjectAddresses); + } + + /* remove duplicates from object addresses list for efficiency */ + List *uniqueObjectAddresses = GetUniqueDependenciesList(resultingObjectAddresses); + + foreach(listCell, uniqueObjectAddresses) + { + ObjectAddress *objectAddress = (ObjectAddress *) lfirst(listCell); + MarkObjectDistributed(objectAddress); + } +} + + /* * PreprocessAlterExtensionContentsStmt issues a notice. It does not propagate. */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 48956641a..7323d2b82 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -140,9 +140,9 @@ multi_ProcessUtility(PlannedStmt *pstmt, return; } - bool checkCreateAlterExtensionVersion = IsCreateAlterExtensionUpdateCitusStmt( + bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt( parsetree); - if (EnableVersionChecks && checkCreateAlterExtensionVersion) + if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt) { ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); } @@ -466,9 +466,34 @@ multi_ProcessUtility(PlannedStmt *pstmt, activeDropSchemaOrDBs++; } + /* + * Check if we are running ALTER EXTENSION citus UPDATE (TO "") command and + * the available version is different than the current version of Citus. In this case, + * ALTER EXTENSION citus UPDATE command can actually update Citus to a new version. + */ + bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt && + IsA(parsetree, AlterExtensionStmt); + + bool citusCanBeUpdatedToAvailableVersion = false; + + if (isAlterExtensionUpdateCitusStmt) + { + citusCanBeUpdatedToAvailableVersion = !InstalledAndAvailableVersionsSame(); + } + standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, completionTag); + /* + * if we are running ALTER EXTENSION citus UPDATE (to "") command, we may need + * to mark existing objects as distributed depending on the "version" parameter if + * specified in "ALTER EXTENSION citus UPDATE" command + */ + if (isAlterExtensionUpdateCitusStmt && citusCanBeUpdatedToAvailableVersion) + { + PostprocessAlterExtensionCitusUpdateStmt(parsetree); + } + /* * Postgres added the following CommandCounterIncrement as a patch in: * - 10.7 -> 10.8 diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index e4e0167df..af8792132 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -20,6 +20,7 @@ #include "catalog/pg_depend.h" #include "catalog/pg_type.h" #include "distributed/commands/utility_hook.h" +#include "distributed/listutils.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "utils/fmgroids.h" @@ -74,6 +75,33 @@ static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress *target); +/* + * GetUniqueDependenciesList takes a list of object addresses and returns a new list + * of ObjectAddesses whose elements are unique. + */ +List * +GetUniqueDependenciesList(List *objectAddressesList) +{ + ObjectAddressCollector objectAddressCollector = { 0 }; + InitObjectAddressCollector(&objectAddressCollector); + + ObjectAddress *objectAddress = NULL; + + foreach_ptr(objectAddress, objectAddressesList) + { + if (IsObjectAddressCollected(objectAddress, &objectAddressCollector)) + { + /* skip objects that are already collected */ + continue; + } + + CollectObjectAddress(&objectAddressCollector, objectAddress); + } + + return objectAddressCollector.dependencyList; +} + + /* * GetDependenciesForObject returns a list of ObjectAddesses to be created in order * before the target object could safely be created on a worker. Some of the object might diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index e43663ab8..f455b2ecd 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -1627,7 +1627,7 @@ CheckAvailableVersion(int elevel) /* * CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the - * extension's current version from the pg_extemsion catalog table. If they + * extension's current version from the pg_extension catalog table. If they * are not compatible, this function logs an error with the specified elevel, * otherwise it returns true. */ @@ -1654,6 +1654,26 @@ CheckInstalledVersion(int elevel) } +/* + * InstalledAndAvailableVersionsSame compares extension's available version and + * its current version from the pg_extension catalog table. If they are not same + * returns false, otherwise returns true. + */ +bool +InstalledAndAvailableVersionsSame() +{ + char *installedVersion = InstalledExtensionVersion(); + char *availableVersion = AvailableExtensionVersion(); + + if (strncmp(installedVersion, availableVersion, NAMEDATALEN) == 0) + { + return true; + } + + return false; +} + + /* * MajorVersionsCompatible checks whether both versions are compatible. They * are if major and minor version numbers match, the schema version is @@ -1904,7 +1924,7 @@ CitusCatalogNamespaceId(void) } -/* return oid of pg_dist_shard relation */ +/* return oid of pg_dist_object relation */ Oid DistObjectRelationId(void) { diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index cadc282c6..ba00a5f13 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -81,6 +81,7 @@ extern List * PostprocessAlterExtensionSchemaStmt(Node *stmt, const char *queryString); extern List * PreprocessAlterExtensionUpdateStmt(Node *stmt, const char *queryString); +extern void PostprocessAlterExtensionCitusUpdateStmt(Node *node); extern List * PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString); extern List * CreateExtensionDDLCommand(const ObjectAddress *extensionAddress); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index c5b8bc941..ca0d60f7c 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -138,6 +138,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, extern void CreateTruncateTrigger(Oid relationId); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); +extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); extern bool ShouldPropagateObject(const ObjectAddress *address); extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort); diff --git a/src/include/distributed/metadata/dependency.h b/src/include/distributed/metadata/dependency.h index 93a5d0977..304fab619 100644 --- a/src/include/distributed/metadata/dependency.h +++ b/src/include/distributed/metadata/dependency.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "nodes/pg_list.h" +extern List * GetUniqueDependenciesList(List *objectAddressesList); extern List * GetDependenciesForObject(const ObjectAddress *target); extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList); extern bool SupportedDependencyByCitus(const ObjectAddress *address); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 17301f015..42ae63fc2 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -150,6 +150,7 @@ extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); extern bool CheckAvailableVersion(int elevel); +extern bool InstalledAndAvailableVersionsSame(void); extern bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); extern void ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry); extern void EnsureModificationsCanRun(void); diff --git a/src/test/regress/after_citus_upgrade_coord_schedule b/src/test/regress/after_citus_upgrade_coord_schedule index 5884e109e..2fb17f010 100644 --- a/src/test/regress/after_citus_upgrade_coord_schedule +++ b/src/test/regress/after_citus_upgrade_coord_schedule @@ -1,3 +1,4 @@ # this schedule is to be run only on coordinators test: upgrade_basic_after +test: upgrade_pg_dist_object_test_after diff --git a/src/test/regress/before_citus_upgrade_coord_schedule b/src/test/regress/before_citus_upgrade_coord_schedule index 1587f5725..38dbc4267 100644 --- a/src/test/regress/before_citus_upgrade_coord_schedule +++ b/src/test/regress/before_citus_upgrade_coord_schedule @@ -1,3 +1,4 @@ # this schedule is to be run on only coordinators test: upgrade_basic_before +test: upgrade_pg_dist_object_test_before diff --git a/src/test/regress/expected/upgrade_pg_dist_object_test_after.out b/src/test/regress/expected/upgrade_pg_dist_object_test_after.out new file mode 100644 index 000000000..0461795b7 --- /dev/null +++ b/src/test/regress/expected/upgrade_pg_dist_object_test_after.out @@ -0,0 +1,33 @@ +-- drop objects from previous test (uprade_basic_after.sql) for a clean test +-- drop upgrade_basic schema and switch back to public schema +SET search_path to public; +DROP SCHEMA upgrade_basic CASCADE; +NOTICE: drop cascades to 7 other objects +DETAIL: drop cascades to table upgrade_basic.t +drop cascades to table upgrade_basic.tp +drop cascades to table upgrade_basic.t_ab +drop cascades to table upgrade_basic.t2 +drop cascades to table upgrade_basic.r +drop cascades to table upgrade_basic.tr +drop cascades to table upgrade_basic.t_append +-- as we updated citus to available version, +-- "isn" extension +-- "new_schema" schema +-- "public" schema +-- "fooschema" schema +-- "footype" type (under schema 'fooschema') + -- will now be marked as distributed + -- but, + -- "seg" extension + -- will not be marked as distributed +-- see underlying objects +SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3; + type | object_names | object_args +--------------------------------------------------------------------- + extension | {isn} | {} + schema | {fooschema} | {} + schema | {new_schema} | {} + schema | {public} | {} + type | {fooschema.footype} | {} + (5 rows) + diff --git a/src/test/regress/expected/upgrade_pg_dist_object_test_before.out b/src/test/regress/expected/upgrade_pg_dist_object_test_before.out new file mode 100644 index 000000000..2bdcedf5c --- /dev/null +++ b/src/test/regress/expected/upgrade_pg_dist_object_test_before.out @@ -0,0 +1,74 @@ +-- create some objects that we just included into distributed object +-- infrastructure in 9.1 versions but not included in 9.0.2 +-- extension propagation -- +-- create an extension on all nodes and a table that depends on it +CREATE EXTENSION isn; +SELECT run_command_on_workers($$CREATE EXTENSION isn;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"CREATE EXTENSION") + (localhost,57637,t,"CREATE EXTENSION") +(2 rows) + +CREATE TABLE isn_dist_table (key int, value issn); +SELECT create_reference_table('isn_dist_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- create an extension on all nodes, but do not create a table depending on it +CREATE EXTENSION seg; +SELECT run_command_on_workers($$CREATE EXTENSION seg;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"CREATE EXTENSION") + (localhost,57637,t,"CREATE EXTENSION") +(2 rows) + +-- schema propagation -- +-- public schema +CREATE TABLE dist_table (a int); +SELECT create_reference_table('dist_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- custom schema +CREATE SCHEMA new_schema; +SET search_path to new_schema; +CREATE TABLE another_dist_table (a int); +SELECT create_reference_table('another_dist_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- another custom schema and a type +-- create table that depends both on a type & schema here (actually type depends on the schema) +-- here we test if schema is marked as distributed successfully. +-- This is because tracking the dependencies will hit to the schema for two times +CREATE SCHEMA fooschema; +CREATE TYPE fooschema.footype AS (x int, y int); +SELECT run_command_on_workers($$CREATE SCHEMA fooschema;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"CREATE SCHEMA") + (localhost,57637,t,"CREATE SCHEMA") + (2 rows) + + SELECT run_command_on_workers($$CREATE TYPE fooschema.footype AS (x int, y int);$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"CREATE TYPE") + (localhost,57637,t,"CREATE TYPE") + (2 rows) + + CREATE TABLE fooschema.footable (f fooschema.footype); + SELECT create_reference_table('fooschema.footable'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/sql/upgrade_pg_dist_object_test_after.sql b/src/test/regress/sql/upgrade_pg_dist_object_test_after.sql new file mode 100644 index 000000000..a086361d1 --- /dev/null +++ b/src/test/regress/sql/upgrade_pg_dist_object_test_after.sql @@ -0,0 +1,18 @@ +-- drop objects from previous test (uprade_basic_after.sql) for a clean test +-- drop upgrade_basic schema and switch back to public schema +SET search_path to public; +DROP SCHEMA upgrade_basic CASCADE; + +-- as we updated citus to available version, +-- "isn" extension +-- "new_schema" schema +-- "public" schema +-- "fooschema" schema +-- "footype" type (under schema 'fooschema') +-- will now be marked as distributed +-- but, +-- "seg" extension +-- will not be marked as distributed + +-- see underlying objects +SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3; diff --git a/src/test/regress/sql/upgrade_pg_dist_object_test_before.sql b/src/test/regress/sql/upgrade_pg_dist_object_test_before.sql new file mode 100644 index 000000000..2244914fb --- /dev/null +++ b/src/test/regress/sql/upgrade_pg_dist_object_test_before.sql @@ -0,0 +1,44 @@ +-- create some objects that we just included into distributed object +-- infrastructure in 9.1 versions but not included in 9.0.2 + +-- extension propagation -- + +-- create an extension on all nodes and a table that depends on it +CREATE EXTENSION isn; +SELECT run_command_on_workers($$CREATE EXTENSION isn;$$); + +CREATE TABLE isn_dist_table (key int, value issn); +SELECT create_reference_table('isn_dist_table'); + +-- create an extension on all nodes, but do not create a table depending on it +CREATE EXTENSION seg; +SELECT run_command_on_workers($$CREATE EXTENSION seg;$$); + +-- schema propagation -- + +-- public schema +CREATE TABLE dist_table (a int); +SELECT create_reference_table('dist_table'); + +-- custom schema +CREATE SCHEMA new_schema; + +SET search_path to new_schema; + +CREATE TABLE another_dist_table (a int); +SELECT create_reference_table('another_dist_table'); + +-- another custom schema and a type + +-- create table that depends both on a type & schema here (actually type depends on the schema) +-- here we test if schema is marked as distributed successfully. +-- This is because tracking the dependencies will hit to the schema for two times + +CREATE SCHEMA fooschema; +CREATE TYPE fooschema.footype AS (x int, y int); + +SELECT run_command_on_workers($$CREATE SCHEMA fooschema;$$); +SELECT run_command_on_workers($$CREATE TYPE fooschema.footype AS (x int, y int);$$); + +CREATE TABLE fooschema.footable (f fooschema.footype); +SELECT create_reference_table('fooschema.footable');