mirror of https://github.com/citusdata/citus.git
Merge pull request #5923 from citusdata/update_view
Mark existing views as distributed when upgrade to 11.0+pull/5931/head
commit
b04222155d
|
@ -166,11 +166,28 @@ EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress)
|
|||
|
||||
|
||||
/*
|
||||
* ErrorIfCircularDependencyExists checks whether given object has circular dependency
|
||||
* with itself via existing objects of pg_dist_object.
|
||||
* ErrorIfCircularDependencyExists is a wrapper around
|
||||
* DeferErrorIfCircularDependencyExists(), and throws error
|
||||
* if circular dependency exists.
|
||||
*/
|
||||
static void
|
||||
ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress)
|
||||
{
|
||||
DeferredErrorMessage *depError =
|
||||
DeferErrorIfCircularDependencyExists(objectAddress);
|
||||
if (depError != NULL)
|
||||
{
|
||||
RaiseDeferredError(depError, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeferErrorIfCircularDependencyExists checks whether given object has
|
||||
* circular dependency with itself via existing objects of pg_dist_object.
|
||||
*/
|
||||
DeferredErrorMessage *
|
||||
DeferErrorIfCircularDependencyExists(const ObjectAddress *objectAddress)
|
||||
{
|
||||
List *dependencies = GetAllSupportedDependenciesForObject(objectAddress);
|
||||
|
||||
|
@ -189,13 +206,18 @@ ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress)
|
|||
objectDescription = getObjectDescription(objectAddress);
|
||||
#endif
|
||||
|
||||
ereport(ERROR, (errmsg("Citus can not handle circular dependencies "
|
||||
"between distributed objects"),
|
||||
errdetail("\"%s\" circularly depends itself, resolve "
|
||||
"circular dependency first",
|
||||
objectDescription)));
|
||||
StringInfo detailInfo = makeStringInfo();
|
||||
appendStringInfo(detailInfo, "\"%s\" circularly depends itself, resolve "
|
||||
"circular dependency first", objectDescription);
|
||||
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Citus can not handle circular dependencies "
|
||||
"between distributed objects", detailInfo->data,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "citus_version.h"
|
||||
#include "catalog/pg_extension_d.h"
|
||||
#include "commands/defrem.h"
|
||||
|
@ -37,6 +38,8 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt);
|
|||
static List * FilterDistributedExtensions(List *extensionObjectList);
|
||||
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
|
||||
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
|
||||
static List * GetAllViews(void);
|
||||
static bool ShouldMarkRelationDistributedOnUpgrade(Oid relationId);
|
||||
static bool ShouldPropagateExtensionCommand(Node *parseTree);
|
||||
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
|
||||
static Node * RecreateExtensionStmt(Oid extensionOid);
|
||||
|
@ -510,27 +513,78 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
Oid citusTableId = InvalidOid;
|
||||
foreach_oid(citusTableId, citusTableIdList)
|
||||
{
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, citusTableId);
|
||||
if (!ShouldMarkRelationDistributedOnUpgrade(citusTableId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* refrain reading the metadata cache for all tables */
|
||||
if (ShouldSyncTableMetadataViaCatalog(citusTableId))
|
||||
{
|
||||
/* we need to pass pointer allocated in the heap */
|
||||
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
|
||||
*addressPointer = tableAddress;
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, citusTableId);
|
||||
|
||||
/* as of Citus 11, tables that should be synced are also considered object */
|
||||
resultingObjectAddresses = lappend(resultingObjectAddresses, addressPointer);
|
||||
/*
|
||||
* We mark tables distributed immediately because we also need to mark
|
||||
* views as distributed. We check whether the views that depend on
|
||||
* the table has any auto-distirbutable dependencies below. Citus
|
||||
* currently cannot "auto" distribute tables as dependencies, so we
|
||||
* mark them distributed immediately.
|
||||
*/
|
||||
MarkObjectDistributedLocally(&tableAddress);
|
||||
|
||||
/*
|
||||
* All the distributable dependencies of a table should be marked as
|
||||
* distributed.
|
||||
*/
|
||||
List *distributableDependencyObjectAddresses =
|
||||
GetDistributableDependenciesForObject(&tableAddress);
|
||||
|
||||
resultingObjectAddresses =
|
||||
list_concat(resultingObjectAddresses,
|
||||
distributableDependencyObjectAddresses);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* As of Citus 11, views on Citus tables that do not have any unsupported
|
||||
* dependency should also be distributed.
|
||||
*
|
||||
* In general, we mark views distributed as long as it does not have
|
||||
* any unsupported dependencies.
|
||||
*/
|
||||
List *viewList = GetAllViews();
|
||||
Oid viewOid = InvalidOid;
|
||||
foreach_oid(viewOid, viewList)
|
||||
{
|
||||
if (!ShouldMarkRelationDistributedOnUpgrade(viewOid))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *distributableDependencyObjectAddresses =
|
||||
GetDistributableDependenciesForObject(&tableAddress);
|
||||
ObjectAddress viewAddress = { 0 };
|
||||
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
||||
|
||||
resultingObjectAddresses = list_concat(resultingObjectAddresses,
|
||||
distributableDependencyObjectAddresses);
|
||||
/*
|
||||
* If a view depends on multiple views, that view will be marked
|
||||
* as distributed while it is processed for the last view
|
||||
* table.
|
||||
*/
|
||||
MarkObjectDistributedLocally(&viewAddress);
|
||||
|
||||
/* we need to pass pointer allocated in the heap */
|
||||
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
|
||||
*addressPointer = viewAddress;
|
||||
|
||||
List *distributableDependencyObjectAddresses =
|
||||
GetDistributableDependenciesForObject(&viewAddress);
|
||||
|
||||
resultingObjectAddresses =
|
||||
list_concat(resultingObjectAddresses,
|
||||
distributableDependencyObjectAddresses);
|
||||
}
|
||||
|
||||
|
||||
/* resolve dependencies of the objects in pg_dist_object*/
|
||||
List *distributedObjectAddressList = GetDistributedObjectAddressList();
|
||||
|
||||
|
@ -566,6 +620,85 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAllViews returns list of view oids that exists on this server.
|
||||
*/
|
||||
static List *
|
||||
GetAllViews(void)
|
||||
{
|
||||
List *viewOidList = NIL;
|
||||
|
||||
Relation pgClass = table_open(RelationRelationId, AccessShareLock);
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgClass, InvalidOid, false, NULL,
|
||||
0, NULL);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_class relationForm = (Form_pg_class) GETSTRUCT(heapTuple);
|
||||
|
||||
/* we're only interested in views */
|
||||
if (relationForm->relkind == RELKIND_VIEW)
|
||||
{
|
||||
viewOidList = lappend_oid(viewOidList, relationForm->oid);
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgClass, NoLock);
|
||||
|
||||
return viewOidList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldMarkRelationDistributedOnUpgrade is a helper function that
|
||||
* decides whether the input relation should be marked as distributed
|
||||
* during the upgrade.
|
||||
*/
|
||||
static bool
|
||||
ShouldMarkRelationDistributedOnUpgrade(Oid relationId)
|
||||
{
|
||||
if (!EnableMetadataSync)
|
||||
{
|
||||
/*
|
||||
* Just in case anything goes wrong, we should still be able
|
||||
* to continue to the version upgrade.
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
ObjectAddress relationAddress = { 0 };
|
||||
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
||||
|
||||
bool pgObject = (relationId < FirstNormalObjectId);
|
||||
bool ownedByExtension = IsTableOwnedByExtension(relationId);
|
||||
bool alreadyDistributed = IsObjectDistributed(&relationAddress);
|
||||
bool hasUnsupportedDependency =
|
||||
DeferErrorIfHasUnsupportedDependency(&relationAddress) != NULL;
|
||||
bool hasCircularDependency =
|
||||
DeferErrorIfCircularDependencyExists(&relationAddress) != NULL;
|
||||
|
||||
/*
|
||||
* pgObject: Citus never marks pg objects as distributed
|
||||
* ownedByExtension: let extensions manage its own objects
|
||||
* alreadyDistributed: most likely via earlier versions
|
||||
* hasUnsupportedDependency: Citus doesn't know how to distribute its dependencies
|
||||
* hasCircularDependency: Citus cannot handle circular dependencies
|
||||
*/
|
||||
if (pgObject || ownedByExtension || alreadyDistributed ||
|
||||
hasUnsupportedDependency || hasCircularDependency)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterExtensionContentsStmt issues a notice. It does not propagate.
|
||||
*/
|
||||
|
|
|
@ -46,7 +46,6 @@
|
|||
#include "utils/rel.h"
|
||||
|
||||
|
||||
static void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
Datum *paramValues);
|
||||
|
@ -195,7 +194,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
|
|||
* This function should never be called alone, MarkObjectDistributed() or
|
||||
* MarkObjectDistributedViaSuperUser() should be called.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
|
||||
{
|
||||
int paramCount = 3;
|
||||
|
|
|
@ -152,6 +152,7 @@ typedef struct MetadataCacheData
|
|||
Oid distShardShardidIndexId;
|
||||
Oid distPlacementShardidIndexId;
|
||||
Oid distPlacementPlacementidIndexId;
|
||||
Oid distColocationidIndexId;
|
||||
Oid distPlacementGroupidIndexId;
|
||||
Oid distTransactionRelationId;
|
||||
Oid distTransactionGroupIndexId;
|
||||
|
@ -2506,6 +2507,17 @@ DistPlacementPlacementidIndexId(void)
|
|||
}
|
||||
|
||||
|
||||
/* return oid of pg_dist_colocation_pkey */
|
||||
Oid
|
||||
DistColocationIndexId(void)
|
||||
{
|
||||
CachedRelationLookup("pg_dist_colocation_pkey",
|
||||
&MetadataCache.distColocationidIndexId);
|
||||
|
||||
return MetadataCache.distColocationidIndexId;
|
||||
}
|
||||
|
||||
|
||||
/* return oid of pg_dist_transaction relation */
|
||||
Oid
|
||||
DistTransactionRelationId(void)
|
||||
|
|
|
@ -3411,12 +3411,19 @@ ColocationGroupCreateCommandList(void)
|
|||
"distributioncolumncollationschema) AS (VALUES ");
|
||||
|
||||
Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock);
|
||||
Relation colocationIdIndexRel = index_open(DistColocationIndexId(), AccessShareLock);
|
||||
|
||||
bool indexOK = false;
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
||||
NULL, 0, NULL);
|
||||
/*
|
||||
* It is not strictly necessary to read the tuples in order.
|
||||
* However, it is useful to get consistent behavior, both for regression
|
||||
* tests and also in production systems.
|
||||
*/
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan_ordered(pgDistColocation, colocationIdIndexRel,
|
||||
NULL, 0, NULL);
|
||||
|
||||
HeapTuple colocationTuple = systable_getnext(scanDescriptor);
|
||||
HeapTuple colocationTuple = systable_getnext_ordered(scanDescriptor,
|
||||
ForwardScanDirection);
|
||||
|
||||
while (HeapTupleIsValid(colocationTuple))
|
||||
{
|
||||
|
@ -3474,10 +3481,11 @@ ColocationGroupCreateCommandList(void)
|
|||
"NULL, NULL)");
|
||||
}
|
||||
|
||||
colocationTuple = systable_getnext(scanDescriptor);
|
||||
colocationTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(colocationIdIndexRel, AccessShareLock);
|
||||
table_close(pgDistColocation, AccessShareLock);
|
||||
|
||||
if (!hasColocations)
|
||||
|
|
|
@ -24,6 +24,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address);
|
|||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||
|
|
|
@ -238,6 +238,7 @@ extern Oid DistShardLogicalRelidIndexId(void);
|
|||
extern Oid DistShardShardidIndexId(void);
|
||||
extern Oid DistPlacementShardidIndexId(void);
|
||||
extern Oid DistPlacementPlacementidIndexId(void);
|
||||
extern Oid DistColocationIndexId(void);
|
||||
extern Oid DistTransactionRelationId(void);
|
||||
extern Oid DistTransactionGroupIndexId(void);
|
||||
extern Oid DistPlacementGroupidIndexId(void);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "catalog/objectaddress.h"
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/errormessage.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/relcache.h"
|
||||
|
@ -258,6 +259,9 @@ extern void CreateTruncateTrigger(Oid relationId);
|
|||
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
||||
|
||||
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
||||
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
|
||||
ObjectAddress *
|
||||
objectAddress);
|
||||
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
|
||||
extern bool ShouldPropagate(void);
|
||||
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
|
||||
|
|
|
@ -9,24 +9,37 @@ NOTICE: Preparing to sync the metadata to all nodes
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- tables are objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
||||
-- tables, views and their dependencies become objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1;
|
||||
pg_identify_object_as_address
|
||||
---------------------------------------------------------------------
|
||||
(function,"{post_11_upgrade,func_in_transaction_def}",{})
|
||||
(schema,{post_11_upgrade},{})
|
||||
(table,"{post_11_upgrade,employees}",{})
|
||||
(table,"{post_11_upgrade,part_table}",{})
|
||||
(table,"{post_11_upgrade,sensors}",{})
|
||||
("text search configuration","{post_11_upgrade,partial_index_test_config}",{})
|
||||
(type,{post_11_upgrade.my_type},{})
|
||||
(6 rows)
|
||||
(type,{post_11_upgrade.my_type_for_view},{})
|
||||
(view,"{post_11_upgrade,depends_on_citus}",{})
|
||||
(view,"{post_11_upgrade,depends_on_nothing_1}",{})
|
||||
(view,"{post_11_upgrade,depends_on_nothing_2}",{})
|
||||
(view,"{post_11_upgrade,depends_on_pg}",{})
|
||||
(view,"{post_11_upgrade,depends_on_seq}",{})
|
||||
(view,"{post_11_upgrade,non_dist_upgrade_multiple_dist_view}",{})
|
||||
(view,"{post_11_upgrade,non_dist_upgrade_ref_view}",{})
|
||||
(view,"{post_11_upgrade,non_dist_upgrade_ref_view_2}",{})
|
||||
(view,"{post_11_upgrade,reporting_line}",{})
|
||||
(view,"{post_11_upgrade,view_for_upgrade_test}",{})
|
||||
(view,"{post_11_upgrade,view_for_upgrade_test_my_type}",{})
|
||||
(19 rows)
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
||||
run_command_on_workers
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(2 rows)
|
||||
|
||||
-- Create the necessary test utility function
|
||||
|
@ -68,7 +81,9 @@ UNION
|
|||
EXCEPT
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
)
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%';
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%' and
|
||||
-- sequences differ per node, so exclude
|
||||
command NOT ILIKE '%sequence%';
|
||||
same_metadata_in_workers
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
|
|
|
@ -114,6 +114,42 @@ INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
|||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||
-- table for recursive view
|
||||
CREATE TABLE employees (employee_id int, manager_id int, full_name text);
|
||||
SELECT create_distributed_table('employees', 'employee_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- table for owned_by_extension
|
||||
-- note that tables owned by extension are
|
||||
-- not added to the pg_dist_object, and assumed
|
||||
-- to exists on all nodes via the extension
|
||||
CREATE TABLE owned_by_extension_table (employee_id int, manager_id int, full_name text);
|
||||
ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;
|
||||
NOTICE: Citus does not propagate adding/dropping member objects
|
||||
HINT: You can add/drop the member objects on the workers as well.
|
||||
SELECT create_distributed_table('owned_by_extension_table', 'employee_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$CREATE TABLE post_11_upgrade.owned_by_extension_table (employee_id int, manager_id int, full_name text);$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,"CREATE TABLE")
|
||||
(localhost,57637,t,"CREATE TABLE")
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers($$ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,"ALTER EXTENSION")
|
||||
(localhost,57637,t,"ALTER EXTENSION")
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||
SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$);
|
||||
|
@ -147,6 +183,72 @@ $$;');
|
|||
(2 rows)
|
||||
|
||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;
|
||||
-- one normally would not need views on the workers pre-11, but still
|
||||
-- nice test to have
|
||||
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,SET)
|
||||
(localhost,57637,t,SET)
|
||||
(2 rows)
|
||||
|
||||
-- a non-distributed type dependency to a view
|
||||
-- both the view and the type should be distributed after the upgrade
|
||||
CREATE TYPE post_11_upgrade.my_type_for_view AS (a int);
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors;
|
||||
-- a local type, table and view, should not be distributed
|
||||
-- after the upgrade
|
||||
CREATE TYPE post_11_upgrade.local_type AS (a int);
|
||||
CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type);
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view;
|
||||
-- a local table joined with a distributed table. In other words, the view has a local table dependency
|
||||
-- and should not be distributed after the upgrade
|
||||
CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int);
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true);
|
||||
-- a view selecting from multiple
|
||||
-- distributed/reference tables should be marked as distributed
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true);
|
||||
-- a view selecting from reference table should be fine
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table;
|
||||
-- a view selecting from another (distributed) view should also be distributed
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view;
|
||||
-- materialized views never becomes distributed
|
||||
CREATE MATERIALIZED VIEW post_11_upgrade.materialized_view AS SELECT * FROM reference_table;
|
||||
CREATE VIEW post_11_upgrade.owned_by_extension_view AS SELECT * FROM reference_table;
|
||||
ALTER EXTENSION plpgsql ADD VIEW post_11_upgrade.owned_by_extension_view;
|
||||
-- temporary views should not be marked as distributed
|
||||
CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table;
|
||||
CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table;
|
||||
-- we should be able to distribute recursive views
|
||||
CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS
|
||||
SELECT employee_id,
|
||||
full_name AS subordinates
|
||||
FROM employees
|
||||
WHERE manager_id IS NULL
|
||||
UNION ALL
|
||||
SELECT e.employee_id,
|
||||
(rl.subordinates || ' > ' || e.full_name) AS subordinates
|
||||
FROM employees e
|
||||
INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id;
|
||||
-- v_test_1 and v_test_2 becomes circularly dependend views
|
||||
-- so we should not try to distribute any of the views
|
||||
CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors;
|
||||
CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors;
|
||||
CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid);
|
||||
CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid);
|
||||
-- views that do not depeend on anything should be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_nothing_1 AS SELECT * FROM (VALUES (1)) as values;
|
||||
CREATE VIEW post_11_upgrade.depends_on_nothing_2 AS SELECT 1;
|
||||
-- views depends pg/citus objects should be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_pg AS SELECT * FROM pg_class;
|
||||
CREATE VIEW post_11_upgrade.depends_on_citus AS SELECT * FROM pg_dist_partition;
|
||||
-- views depend on sequences only should be distributed
|
||||
CREATE SEQUENCE post_11_upgrade.seq_bigint AS bigint INCREMENT BY 3 CACHE 10 CYCLE;
|
||||
CREATE VIEW post_11_upgrade.depends_on_seq AS SELECT nextval('post_11_upgrade.seq_bigint');
|
||||
-- views depend on a sequence and a local table should not be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_seq_and_no_support AS SELECT nextval('post_11_upgrade.seq_bigint') FROM post_11_upgrade.non_dist_table_for_view;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
CREATE TABLE sensors_parser(
|
||||
measureid integer,
|
||||
|
|
|
@ -4,11 +4,11 @@ SET search_path = post_11_upgrade;
|
|||
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true);
|
||||
SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false);
|
||||
|
||||
-- tables are objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
||||
-- tables, views and their dependencies become objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1;
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||
|
||||
-- Create the necessary test utility function
|
||||
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
||||
|
@ -39,4 +39,6 @@ UNION
|
|||
EXCEPT
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
)
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%';
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%' and
|
||||
-- sequences differ per node, so exclude
|
||||
command NOT ILIKE '%sequence%';
|
||||
|
|
|
@ -104,6 +104,19 @@ INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
|||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||
|
||||
-- table for recursive view
|
||||
CREATE TABLE employees (employee_id int, manager_id int, full_name text);
|
||||
SELECT create_distributed_table('employees', 'employee_id');
|
||||
|
||||
-- table for owned_by_extension
|
||||
-- note that tables owned by extension are
|
||||
-- not added to the pg_dist_object, and assumed
|
||||
-- to exists on all nodes via the extension
|
||||
CREATE TABLE owned_by_extension_table (employee_id int, manager_id int, full_name text);
|
||||
ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;
|
||||
SELECT create_distributed_table('owned_by_extension_table', 'employee_id');
|
||||
SELECT run_command_on_workers($$CREATE TABLE post_11_upgrade.owned_by_extension_table (employee_id int, manager_id int, full_name text);$$);
|
||||
SELECT run_command_on_workers($$ALTER EXTENSION plpgsql ADD TABLE post_11_upgrade.owned_by_extension_table;$$);
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||
|
@ -129,6 +142,83 @@ END;
|
|||
$$;');
|
||||
|
||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;
|
||||
|
||||
-- one normally would not need views on the workers pre-11, but still
|
||||
-- nice test to have
|
||||
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;');
|
||||
|
||||
|
||||
-- a non-distributed type dependency to a view
|
||||
-- both the view and the type should be distributed after the upgrade
|
||||
CREATE TYPE post_11_upgrade.my_type_for_view AS (a int);
|
||||
CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors;
|
||||
|
||||
-- a local type, table and view, should not be distributed
|
||||
-- after the upgrade
|
||||
CREATE TYPE post_11_upgrade.local_type AS (a int);
|
||||
CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type);
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view;
|
||||
|
||||
-- a local table joined with a distributed table. In other words, the view has a local table dependency
|
||||
-- and should not be distributed after the upgrade
|
||||
CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int);
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true);
|
||||
|
||||
-- a view selecting from multiple
|
||||
-- distributed/reference tables should be marked as distributed
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true);
|
||||
|
||||
-- a view selecting from reference table should be fine
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table;
|
||||
|
||||
-- a view selecting from another (distributed) view should also be distributed
|
||||
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view;
|
||||
|
||||
-- materialized views never becomes distributed
|
||||
CREATE MATERIALIZED VIEW post_11_upgrade.materialized_view AS SELECT * FROM reference_table;
|
||||
|
||||
CREATE VIEW post_11_upgrade.owned_by_extension_view AS SELECT * FROM reference_table;
|
||||
ALTER EXTENSION plpgsql ADD VIEW post_11_upgrade.owned_by_extension_view;
|
||||
|
||||
-- temporary views should not be marked as distributed
|
||||
CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table;
|
||||
CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table;
|
||||
|
||||
-- we should be able to distribute recursive views
|
||||
CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS
|
||||
SELECT employee_id,
|
||||
full_name AS subordinates
|
||||
FROM employees
|
||||
WHERE manager_id IS NULL
|
||||
UNION ALL
|
||||
SELECT e.employee_id,
|
||||
(rl.subordinates || ' > ' || e.full_name) AS subordinates
|
||||
FROM employees e
|
||||
INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id;
|
||||
|
||||
-- v_test_1 and v_test_2 becomes circularly dependend views
|
||||
-- so we should not try to distribute any of the views
|
||||
CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors;
|
||||
CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors;
|
||||
CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid);
|
||||
CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid);
|
||||
|
||||
-- views that do not depeend on anything should be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_nothing_1 AS SELECT * FROM (VALUES (1)) as values;
|
||||
CREATE VIEW post_11_upgrade.depends_on_nothing_2 AS SELECT 1;
|
||||
|
||||
-- views depends pg/citus objects should be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_pg AS SELECT * FROM pg_class;
|
||||
CREATE VIEW post_11_upgrade.depends_on_citus AS SELECT * FROM pg_dist_partition;
|
||||
|
||||
-- views depend on sequences only should be distributed
|
||||
CREATE SEQUENCE post_11_upgrade.seq_bigint AS bigint INCREMENT BY 3 CACHE 10 CYCLE;
|
||||
CREATE VIEW post_11_upgrade.depends_on_seq AS SELECT nextval('post_11_upgrade.seq_bigint');
|
||||
|
||||
-- views depend on a sequence and a local table should not be distributed
|
||||
CREATE VIEW post_11_upgrade.depends_on_seq_and_no_support AS SELECT nextval('post_11_upgrade.seq_bigint') FROM post_11_upgrade.non_dist_table_for_view;
|
||||
|
||||
RESET citus.enable_ddl_propagation;
|
||||
|
||||
|
|
Loading…
Reference in New Issue