diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index a77a33b00..228b2efbe 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -166,11 +166,28 @@ EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress) /* - * ErrorIfCircularDependencyExists checks whether given object has circular dependency - * with itself via existing objects of pg_dist_object. + * ErrorIfCircularDependencyExists is a wrapper around + * DeferErrorIfCircularDependencyExists(), and throws error + * if circular dependency exists. */ static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress) +{ + DeferredErrorMessage *depError = + DeferErrorIfCircularDependencyExists(objectAddress); + if (depError != NULL) + { + RaiseDeferredError(depError, ERROR); + } +} + + +/* + * DeferErrorIfCircularDependencyExists checks whether given object has + * circular dependency with itself via existing objects of pg_dist_object. + */ +DeferredErrorMessage * +DeferErrorIfCircularDependencyExists(const ObjectAddress *objectAddress) { List *dependencies = GetAllSupportedDependenciesForObject(objectAddress); @@ -189,13 +206,18 @@ ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress) objectDescription = getObjectDescription(objectAddress); #endif - ereport(ERROR, (errmsg("Citus can not handle circular dependencies " - "between distributed objects"), - errdetail("\"%s\" circularly depends itself, resolve " - "circular dependency first", - objectDescription))); + StringInfo detailInfo = makeStringInfo(); + appendStringInfo(detailInfo, "\"%s\" circularly depends itself, resolve " + "circular dependency first", objectDescription); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Citus can not handle circular dependencies " + "between distributed objects", detailInfo->data, + NULL); } } + + return NULL; } diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index e4b720b4c..1e48d0c14 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -10,6 +10,7 @@ #include "postgres.h" +#include "access/genam.h" #include "citus_version.h" #include "catalog/pg_extension_d.h" #include "commands/defrem.h" @@ -37,6 +38,8 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); static List * FilterDistributedExtensions(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); static void MarkExistingObjectDependenciesDistributedIfSupported(void); +static List * GetAllViews(void); +static bool ShouldMarkRelationDistributedOnUpgrade(Oid relationId); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); @@ -510,27 +513,78 @@ MarkExistingObjectDependenciesDistributedIfSupported() Oid citusTableId = InvalidOid; foreach_oid(citusTableId, citusTableIdList) { - ObjectAddress tableAddress = { 0 }; - ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); + if (!ShouldMarkRelationDistributedOnUpgrade(citusTableId)) + { + continue; + } /* refrain reading the metadata cache for all tables */ if (ShouldSyncTableMetadataViaCatalog(citusTableId)) { - /* we need to pass pointer allocated in the heap */ - ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); - *addressPointer = tableAddress; + ObjectAddress tableAddress = { 0 }; + ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); - /* as of Citus 11, tables that should be synced are also considered object */ - resultingObjectAddresses = lappend(resultingObjectAddresses, addressPointer); + /* + * We mark tables distributed immediately because we also need to mark + * views as distributed. We check whether the views that depend on + * the table has any auto-distirbutable dependencies below. Citus + * currently cannot "auto" distribute tables as dependencies, so we + * mark them distributed immediately. + */ + MarkObjectDistributedLocally(&tableAddress); + + /* + * All the distributable dependencies of a table should be marked as + * distributed. + */ + List *distributableDependencyObjectAddresses = + GetDistributableDependenciesForObject(&tableAddress); + + resultingObjectAddresses = + list_concat(resultingObjectAddresses, + distributableDependencyObjectAddresses); + } + } + + /* + * As of Citus 11, views on Citus tables that do not have any unsupported + * dependency should also be distributed. + * + * In general, we mark views distributed as long as it does not have + * any unsupported dependencies. + */ + List *viewList = GetAllViews(); + Oid viewOid = InvalidOid; + foreach_oid(viewOid, viewList) + { + if (!ShouldMarkRelationDistributedOnUpgrade(viewOid)) + { + continue; } - List *distributableDependencyObjectAddresses = - GetDistributableDependenciesForObject(&tableAddress); + ObjectAddress viewAddress = { 0 }; + ObjectAddressSet(viewAddress, RelationRelationId, viewOid); - resultingObjectAddresses = list_concat(resultingObjectAddresses, - distributableDependencyObjectAddresses); + /* + * If a view depends on multiple views, that view will be marked + * as distributed while it is processed for the last view + * table. + */ + MarkObjectDistributedLocally(&viewAddress); + + /* we need to pass pointer allocated in the heap */ + ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); + *addressPointer = viewAddress; + + List *distributableDependencyObjectAddresses = + GetDistributableDependenciesForObject(&viewAddress); + + resultingObjectAddresses = + list_concat(resultingObjectAddresses, + distributableDependencyObjectAddresses); } + /* resolve dependencies of the objects in pg_dist_object*/ List *distributedObjectAddressList = GetDistributedObjectAddressList(); @@ -566,6 +620,85 @@ MarkExistingObjectDependenciesDistributedIfSupported() } +/* + * GetAllViews returns list of view oids that exists on this server. + */ +static List * +GetAllViews(void) +{ + List *viewOidList = NIL; + + Relation pgClass = table_open(RelationRelationId, AccessShareLock); + + SysScanDesc scanDescriptor = systable_beginscan(pgClass, InvalidOid, false, NULL, + 0, NULL); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Form_pg_class relationForm = (Form_pg_class) GETSTRUCT(heapTuple); + + /* we're only interested in views */ + if (relationForm->relkind == RELKIND_VIEW) + { + viewOidList = lappend_oid(viewOidList, relationForm->oid); + } + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgClass, NoLock); + + return viewOidList; +} + + +/* + * ShouldMarkRelationDistributedOnUpgrade is a helper function that + * decides whether the input relation should be marked as distributed + * during the upgrade. + */ +static bool +ShouldMarkRelationDistributedOnUpgrade(Oid relationId) +{ + if (!EnableMetadataSync) + { + /* + * Just in case anything goes wrong, we should still be able + * to continue to the version upgrade. + */ + return false; + } + + ObjectAddress relationAddress = { 0 }; + ObjectAddressSet(relationAddress, RelationRelationId, relationId); + + bool pgObject = (relationId < FirstNormalObjectId); + bool ownedByExtension = IsTableOwnedByExtension(relationId); + bool alreadyDistributed = IsObjectDistributed(&relationAddress); + bool hasUnsupportedDependency = + DeferErrorIfHasUnsupportedDependency(&relationAddress) != NULL; + bool hasCircularDependency = + DeferErrorIfCircularDependencyExists(&relationAddress) != NULL; + + /* + * pgObject: Citus never marks pg objects as distributed + * ownedByExtension: let extensions manage its own objects + * alreadyDistributed: most likely via earlier versions + * hasUnsupportedDependency: Citus doesn't know how to distribute its dependencies + * hasCircularDependency: Citus cannot handle circular dependencies + */ + if (pgObject || ownedByExtension || alreadyDistributed || + hasUnsupportedDependency || hasCircularDependency) + { + return false; + } + + return true; +} + + /* * PreprocessAlterExtensionContentsStmt issues a notice. It does not propagate. */ diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index b345210af..172691b7d 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -46,7 +46,6 @@ #include "utils/rel.h" -static void MarkObjectDistributedLocally(const ObjectAddress *distAddress); static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); @@ -195,7 +194,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) * This function should never be called alone, MarkObjectDistributed() or * MarkObjectDistributedViaSuperUser() should be called. */ -static void +void MarkObjectDistributedLocally(const ObjectAddress *distAddress) { int paramCount = 3; diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index af80df0c4..a2a1fb2b0 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -152,6 +152,7 @@ typedef struct MetadataCacheData Oid distShardShardidIndexId; Oid distPlacementShardidIndexId; Oid distPlacementPlacementidIndexId; + Oid distColocationidIndexId; Oid distPlacementGroupidIndexId; Oid distTransactionRelationId; Oid distTransactionGroupIndexId; @@ -2506,6 +2507,17 @@ DistPlacementPlacementidIndexId(void) } +/* return oid of pg_dist_colocation_pkey */ +Oid +DistColocationIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_pkey", + &MetadataCache.distColocationidIndexId); + + return MetadataCache.distColocationidIndexId; +} + + /* return oid of pg_dist_transaction relation */ Oid DistTransactionRelationId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 25b3aea00..d138a54ce 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -3411,12 +3411,19 @@ ColocationGroupCreateCommandList(void) "distributioncolumncollationschema) AS (VALUES "); Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock); + Relation colocationIdIndexRel = index_open(DistColocationIndexId(), AccessShareLock); - bool indexOK = false; - SysScanDesc scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, - NULL, 0, NULL); + /* + * It is not strictly necessary to read the tuples in order. + * However, it is useful to get consistent behavior, both for regression + * tests and also in production systems. + */ + SysScanDesc scanDescriptor = + systable_beginscan_ordered(pgDistColocation, colocationIdIndexRel, + NULL, 0, NULL); - HeapTuple colocationTuple = systable_getnext(scanDescriptor); + HeapTuple colocationTuple = systable_getnext_ordered(scanDescriptor, + ForwardScanDirection); while (HeapTupleIsValid(colocationTuple)) { @@ -3474,10 +3481,11 @@ ColocationGroupCreateCommandList(void) "NULL, NULL)"); } - colocationTuple = systable_getnext(scanDescriptor); + colocationTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection); } - systable_endscan(scanDescriptor); + systable_endscan_ordered(scanDescriptor); + index_close(colocationIdIndexRel, AccessShareLock); table_close(pgDistColocation, AccessShareLock); if (!hasColocations) diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 5ea04ec73..9511df4cf 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -24,6 +24,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); +extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e190aef6f..972849d53 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -238,6 +238,7 @@ extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementPlacementidIndexId(void); +extern Oid DistColocationIndexId(void); extern Oid DistTransactionRelationId(void); extern Oid DistTransactionGroupIndexId(void); extern Oid DistPlacementGroupidIndexId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index d1ac7216a..f613802fa 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -23,6 +23,7 @@ #include "catalog/objectaddress.h" #include "distributed/citus_nodes.h" #include "distributed/connection_management.h" +#include "distributed/errormessage.h" #include "distributed/relay_utility.h" #include "utils/acl.h" #include "utils/relcache.h" @@ -258,6 +259,9 @@ extern void CreateTruncateTrigger(Oid relationId); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); +extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const + ObjectAddress * + objectAddress); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); diff --git a/src/test/regress/expected/upgrade_post_11_after.out b/src/test/regress/expected/upgrade_post_11_after.out index b38be95c7..a52c5a2e0 100644 --- a/src/test/regress/expected/upgrade_post_11_after.out +++ b/src/test/regress/expected/upgrade_post_11_after.out @@ -9,24 +9,37 @@ NOTICE: Preparing to sync the metadata to all nodes t (1 row) --- tables are objects with Citus 11+ -SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1; +-- tables, views and their dependencies become objects with Citus 11+ +SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1; pg_identify_object_as_address --------------------------------------------------------------------- (function,"{post_11_upgrade,func_in_transaction_def}",{}) (schema,{post_11_upgrade},{}) + (table,"{post_11_upgrade,employees}",{}) (table,"{post_11_upgrade,part_table}",{}) (table,"{post_11_upgrade,sensors}",{}) ("text search configuration","{post_11_upgrade,partial_index_test_config}",{}) (type,{post_11_upgrade.my_type},{}) -(6 rows) + (type,{post_11_upgrade.my_type_for_view},{}) + (view,"{post_11_upgrade,depends_on_citus}",{}) + (view,"{post_11_upgrade,depends_on_nothing_1}",{}) + (view,"{post_11_upgrade,depends_on_nothing_2}",{}) + (view,"{post_11_upgrade,depends_on_pg}",{}) + (view,"{post_11_upgrade,depends_on_seq}",{}) + (view,"{post_11_upgrade,non_dist_upgrade_multiple_dist_view}",{}) + (view,"{post_11_upgrade,non_dist_upgrade_ref_view}",{}) + (view,"{post_11_upgrade,non_dist_upgrade_ref_view_2}",{}) + (view,"{post_11_upgrade,reporting_line}",{}) + (view,"{post_11_upgrade,view_for_upgrade_test}",{}) + (view,"{post_11_upgrade,view_for_upgrade_test_my_type}",{}) +(19 rows) -- on all nodes -SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1; - run_command_on_workers +SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1; + run_command_on_workers --------------------------------------------------------------------- - (localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") - (localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") + (localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") + (localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") (2 rows) -- Create the necessary test utility function @@ -68,7 +81,9 @@ UNION EXCEPT SELECT unnest(activate_node_snapshot()) as command ) -) AS foo WHERE command NOT ILIKE '%distributed_object_data%'; +) AS foo WHERE command NOT ILIKE '%distributed_object_data%' and +-- sequences differ per node, so exclude +command NOT ILIKE '%sequence%'; same_metadata_in_workers --------------------------------------------------------------------- t diff --git a/src/test/regress/expected/upgrade_post_11_before.out b/src/test/regress/expected/upgrade_post_11_before.out index 37bbab11b..3ae8f3d8a 100644 --- a/src/test/regress/expected/upgrade_post_11_before.out +++ b/src/test/regress/expected/upgrade_post_11_before.out @@ -114,6 +114,42 @@ INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i; +-- table for recursive view +CREATE TABLE employees (employee_id int, manager_id int, full_name text); +SELECT create_distributed_table('employees', 'employee_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- table for owned_by_extension +-- note that tables owned by extension are +-- not added to the pg_dist_object, and assumed +-- to exists on all nodes via the extension +CREATE TABLE owned_by_extension_table (employee_id int, manager_id int, full_name text); +ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table; +NOTICE: Citus does not propagate adding/dropping member objects +HINT: You can add/drop the member objects on the workers as well. +SELECT create_distributed_table('owned_by_extension_table', 'employee_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT run_command_on_workers($$CREATE TABLE post_11_upgrade.owned_by_extension_table (employee_id int, manager_id int, full_name text);$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"CREATE TABLE") + (localhost,57637,t,"CREATE TABLE") +(2 rows) + +SELECT run_command_on_workers($$ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"ALTER EXTENSION") + (localhost,57637,t,"ALTER EXTENSION") +(2 rows) + SET citus.enable_ddl_propagation TO off; CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default ); SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$); @@ -147,6 +183,72 @@ $$;'); (2 rows) CREATE TYPE post_11_upgrade.my_type AS (a int); +CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors; +-- one normally would not need views on the workers pre-11, but still +-- nice test to have +SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off; +CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,SET) + (localhost,57637,t,SET) +(2 rows) + +-- a non-distributed type dependency to a view +-- both the view and the type should be distributed after the upgrade +CREATE TYPE post_11_upgrade.my_type_for_view AS (a int); +CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors; +-- a local type, table and view, should not be distributed +-- after the upgrade +CREATE TYPE post_11_upgrade.local_type AS (a int); +CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type); +CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view; +-- a local table joined with a distributed table. In other words, the view has a local table dependency +-- and should not be distributed after the upgrade +CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int); +CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true); +-- a view selecting from multiple +-- distributed/reference tables should be marked as distributed +CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true); +-- a view selecting from reference table should be fine +CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table; +-- a view selecting from another (distributed) view should also be distributed +CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view; +-- materialized views never becomes distributed +CREATE MATERIALIZED VIEW post_11_upgrade.materialized_view AS SELECT * FROM reference_table; +CREATE VIEW post_11_upgrade.owned_by_extension_view AS SELECT * FROM reference_table; +ALTER EXTENSION plpgsql ADD VIEW post_11_upgrade.owned_by_extension_view; +-- temporary views should not be marked as distributed +CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table; +CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table; +-- we should be able to distribute recursive views +CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS +SELECT employee_id, + full_name AS subordinates +FROM employees +WHERE manager_id IS NULL +UNION ALL +SELECT e.employee_id, + (rl.subordinates || ' > ' || e.full_name) AS subordinates +FROM employees e +INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id; +-- v_test_1 and v_test_2 becomes circularly dependend views +-- so we should not try to distribute any of the views +CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors; +CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors; +CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid); +CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid); +-- views that do not depeend on anything should be distributed +CREATE VIEW post_11_upgrade.depends_on_nothing_1 AS SELECT * FROM (VALUES (1)) as values; +CREATE VIEW post_11_upgrade.depends_on_nothing_2 AS SELECT 1; +-- views depends pg/citus objects should be distributed +CREATE VIEW post_11_upgrade.depends_on_pg AS SELECT * FROM pg_class; +CREATE VIEW post_11_upgrade.depends_on_citus AS SELECT * FROM pg_dist_partition; +-- views depend on sequences only should be distributed +CREATE SEQUENCE post_11_upgrade.seq_bigint AS bigint INCREMENT BY 3 CACHE 10 CYCLE; +CREATE VIEW post_11_upgrade.depends_on_seq AS SELECT nextval('post_11_upgrade.seq_bigint'); +-- views depend on a sequence and a local table should not be distributed +CREATE VIEW post_11_upgrade.depends_on_seq_and_no_support AS SELECT nextval('post_11_upgrade.seq_bigint') FROM post_11_upgrade.non_dist_table_for_view; RESET citus.enable_ddl_propagation; CREATE TABLE sensors_parser( measureid integer, diff --git a/src/test/regress/sql/upgrade_post_11_after.sql b/src/test/regress/sql/upgrade_post_11_after.sql index a106b9fcf..71c15614f 100644 --- a/src/test/regress/sql/upgrade_post_11_after.sql +++ b/src/test/regress/sql/upgrade_post_11_after.sql @@ -4,11 +4,11 @@ SET search_path = post_11_upgrade; UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true); SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false); --- tables are objects with Citus 11+ -SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1; +-- tables, views and their dependencies become objects with Citus 11+ +SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1; -- on all nodes -SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1; +SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1; -- Create the necessary test utility function CREATE OR REPLACE FUNCTION activate_node_snapshot() @@ -39,4 +39,6 @@ UNION EXCEPT SELECT unnest(activate_node_snapshot()) as command ) -) AS foo WHERE command NOT ILIKE '%distributed_object_data%'; +) AS foo WHERE command NOT ILIKE '%distributed_object_data%' and +-- sequences differ per node, so exclude +command NOT ILIKE '%sequence%'; diff --git a/src/test/regress/sql/upgrade_post_11_before.sql b/src/test/regress/sql/upgrade_post_11_before.sql index 959b026f8..abd61d44d 100644 --- a/src/test/regress/sql/upgrade_post_11_before.sql +++ b/src/test/regress/sql/upgrade_post_11_before.sql @@ -104,6 +104,19 @@ INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i; +-- table for recursive view +CREATE TABLE employees (employee_id int, manager_id int, full_name text); +SELECT create_distributed_table('employees', 'employee_id'); + +-- table for owned_by_extension +-- note that tables owned by extension are +-- not added to the pg_dist_object, and assumed +-- to exists on all nodes via the extension +CREATE TABLE owned_by_extension_table (employee_id int, manager_id int, full_name text); +ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table; +SELECT create_distributed_table('owned_by_extension_table', 'employee_id'); +SELECT run_command_on_workers($$CREATE TABLE post_11_upgrade.owned_by_extension_table (employee_id int, manager_id int, full_name text);$$); +SELECT run_command_on_workers($$ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;$$); SET citus.enable_ddl_propagation TO off; CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default ); @@ -129,6 +142,83 @@ END; $$;'); CREATE TYPE post_11_upgrade.my_type AS (a int); +CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors; + +-- one normally would not need views on the workers pre-11, but still +-- nice test to have +SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off; +CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;'); + + +-- a non-distributed type dependency to a view +-- both the view and the type should be distributed after the upgrade +CREATE TYPE post_11_upgrade.my_type_for_view AS (a int); +CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors; + +-- a local type, table and view, should not be distributed +-- after the upgrade +CREATE TYPE post_11_upgrade.local_type AS (a int); +CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type); +CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view; + +-- a local table joined with a distributed table. In other words, the view has a local table dependency +-- and should not be distributed after the upgrade +CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int); +CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true); + +-- a view selecting from multiple +-- distributed/reference tables should be marked as distributed +CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true); + +-- a view selecting from reference table should be fine +CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table; + +-- a view selecting from another (distributed) view should also be distributed +CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view; + +-- materialized views never becomes distributed +CREATE MATERIALIZED VIEW post_11_upgrade.materialized_view AS SELECT * FROM reference_table; + +CREATE VIEW post_11_upgrade.owned_by_extension_view AS SELECT * FROM reference_table; +ALTER EXTENSION plpgsql ADD VIEW post_11_upgrade.owned_by_extension_view; + +-- temporary views should not be marked as distributed +CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table; +CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table; + +-- we should be able to distribute recursive views +CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS +SELECT employee_id, + full_name AS subordinates +FROM employees +WHERE manager_id IS NULL +UNION ALL +SELECT e.employee_id, + (rl.subordinates || ' > ' || e.full_name) AS subordinates +FROM employees e +INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id; + +-- v_test_1 and v_test_2 becomes circularly dependend views +-- so we should not try to distribute any of the views +CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors; +CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors; +CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid); +CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid); + +-- views that do not depeend on anything should be distributed +CREATE VIEW post_11_upgrade.depends_on_nothing_1 AS SELECT * FROM (VALUES (1)) as values; +CREATE VIEW post_11_upgrade.depends_on_nothing_2 AS SELECT 1; + +-- views depends pg/citus objects should be distributed +CREATE VIEW post_11_upgrade.depends_on_pg AS SELECT * FROM pg_class; +CREATE VIEW post_11_upgrade.depends_on_citus AS SELECT * FROM pg_dist_partition; + +-- views depend on sequences only should be distributed +CREATE SEQUENCE post_11_upgrade.seq_bigint AS bigint INCREMENT BY 3 CACHE 10 CYCLE; +CREATE VIEW post_11_upgrade.depends_on_seq AS SELECT nextval('post_11_upgrade.seq_bigint'); + +-- views depend on a sequence and a local table should not be distributed +CREATE VIEW post_11_upgrade.depends_on_seq_and_no_support AS SELECT nextval('post_11_upgrade.seq_bigint') FROM post_11_upgrade.non_dist_table_for_view; RESET citus.enable_ddl_propagation;