mirror of https://github.com/citusdata/citus.git
Mark existing views as distributed when upgrade to 11.0+
We have a mechanism which ensures that newly distributed objects are recorded during `alter extension citus update`. However, the logic was lacking "view"s. With this commit, we make sure that existing views are also marked as distributed during upgrade.update_view_rebase
parent
544e6c7428
commit
ec700644ff
|
@ -516,19 +516,70 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
||||||
/* refrain reading the metadata cache for all tables */
|
/* refrain reading the metadata cache for all tables */
|
||||||
if (ShouldSyncTableMetadataViaCatalog(citusTableId))
|
if (ShouldSyncTableMetadataViaCatalog(citusTableId))
|
||||||
{
|
{
|
||||||
/* we need to pass pointer allocated in the heap */
|
/*
|
||||||
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
|
* All the distributable dependencies of a table should be marked as
|
||||||
*addressPointer = tableAddress;
|
* distributed.
|
||||||
|
*/
|
||||||
/* as of Citus 11, tables that should be synced are also considered object */
|
|
||||||
resultingObjectAddresses = lappend(resultingObjectAddresses, addressPointer);
|
|
||||||
}
|
|
||||||
|
|
||||||
List *distributableDependencyObjectAddresses =
|
List *distributableDependencyObjectAddresses =
|
||||||
GetDistributableDependenciesForObject(&tableAddress);
|
GetDistributableDependenciesForObject(&tableAddress);
|
||||||
|
|
||||||
resultingObjectAddresses = list_concat(resultingObjectAddresses,
|
resultingObjectAddresses =
|
||||||
|
list_concat(resultingObjectAddresses,
|
||||||
distributableDependencyObjectAddresses);
|
distributableDependencyObjectAddresses);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We mark tables distributed immediately because we also need to mark
|
||||||
|
* views as distributed. We check whether the views that depend on
|
||||||
|
* the table has any auto-distirbutable dependencies below. Citus
|
||||||
|
* currently cannot "auto" distribute tables as dependencies, so we
|
||||||
|
* mark them distributed immediately.
|
||||||
|
*
|
||||||
|
* If a view depends on multiple tables, that view will be marked
|
||||||
|
* as distributed while it is processed for the last citus
|
||||||
|
* table.
|
||||||
|
*/
|
||||||
|
MarkObjectDistributedLocally(&tableAddress);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* As of Citus 11, views on Citus tables that do not have any unsupported
|
||||||
|
* dependency should also be distributed.
|
||||||
|
*/
|
||||||
|
List *viewList = GetDependingViews(citusTableId);
|
||||||
|
Oid viewOid = InvalidOid;
|
||||||
|
foreach_oid(viewOid, viewList)
|
||||||
|
{
|
||||||
|
ObjectAddress viewAddress = { 0 };
|
||||||
|
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
||||||
|
|
||||||
|
if (DeferErrorIfHasUnsupportedDependency(&viewAddress) != NULL)
|
||||||
|
{
|
||||||
|
/* this view has unsupported dependencies, skip */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now, mark the view and its dependencies as distributed as
|
||||||
|
* well. For example, a type could only be depending to a
|
||||||
|
* view.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* we need to pass pointer allocated in the heap */
|
||||||
|
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
|
||||||
|
*addressPointer = viewAddress;
|
||||||
|
|
||||||
|
/* */
|
||||||
|
resultingObjectAddresses =
|
||||||
|
lappend(resultingObjectAddresses, addressPointer);
|
||||||
|
|
||||||
|
distributableDependencyObjectAddresses =
|
||||||
|
GetDistributableDependenciesForObject(&viewAddress);
|
||||||
|
|
||||||
|
resultingObjectAddresses =
|
||||||
|
list_concat(resultingObjectAddresses,
|
||||||
|
distributableDependencyObjectAddresses);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* resolve dependencies of the objects in pg_dist_object*/
|
/* resolve dependencies of the objects in pg_dist_object*/
|
||||||
|
|
|
@ -46,7 +46,6 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
static void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
|
||||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||||
Datum *paramValues);
|
Datum *paramValues);
|
||||||
|
@ -195,7 +194,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
|
||||||
* This function should never be called alone, MarkObjectDistributed() or
|
* This function should never be called alone, MarkObjectDistributed() or
|
||||||
* MarkObjectDistributedViaSuperUser() should be called.
|
* MarkObjectDistributedViaSuperUser() should be called.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
|
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
|
||||||
{
|
{
|
||||||
int paramCount = 3;
|
int paramCount = 3;
|
||||||
|
|
|
@ -152,6 +152,7 @@ typedef struct MetadataCacheData
|
||||||
Oid distShardShardidIndexId;
|
Oid distShardShardidIndexId;
|
||||||
Oid distPlacementShardidIndexId;
|
Oid distPlacementShardidIndexId;
|
||||||
Oid distPlacementPlacementidIndexId;
|
Oid distPlacementPlacementidIndexId;
|
||||||
|
Oid distColocationidIndexId;
|
||||||
Oid distPlacementGroupidIndexId;
|
Oid distPlacementGroupidIndexId;
|
||||||
Oid distTransactionRelationId;
|
Oid distTransactionRelationId;
|
||||||
Oid distTransactionGroupIndexId;
|
Oid distTransactionGroupIndexId;
|
||||||
|
@ -2506,6 +2507,17 @@ DistPlacementPlacementidIndexId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* return oid of pg_dist_colocation_pkey */
|
||||||
|
Oid
|
||||||
|
DistColocationIndexId(void)
|
||||||
|
{
|
||||||
|
CachedRelationLookup("pg_dist_colocation_pkey",
|
||||||
|
&MetadataCache.distColocationidIndexId);
|
||||||
|
|
||||||
|
return MetadataCache.distColocationidIndexId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* return oid of pg_dist_transaction relation */
|
/* return oid of pg_dist_transaction relation */
|
||||||
Oid
|
Oid
|
||||||
DistTransactionRelationId(void)
|
DistTransactionRelationId(void)
|
||||||
|
|
|
@ -3411,12 +3411,19 @@ ColocationGroupCreateCommandList(void)
|
||||||
"distributioncolumncollationschema) AS (VALUES ");
|
"distributioncolumncollationschema) AS (VALUES ");
|
||||||
|
|
||||||
Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock);
|
Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock);
|
||||||
|
Relation colocationIdIndexRel = index_open(DistColocationIndexId(), AccessShareLock);
|
||||||
|
|
||||||
bool indexOK = false;
|
/*
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
* It is not strictly necessary to read the tuples in order.
|
||||||
|
* However, it is useful to get consistent behavior, both for regression
|
||||||
|
* tests and also in production systems.
|
||||||
|
*/
|
||||||
|
SysScanDesc scanDescriptor =
|
||||||
|
systable_beginscan_ordered(pgDistColocation, colocationIdIndexRel,
|
||||||
NULL, 0, NULL);
|
NULL, 0, NULL);
|
||||||
|
|
||||||
HeapTuple colocationTuple = systable_getnext(scanDescriptor);
|
HeapTuple colocationTuple = systable_getnext_ordered(scanDescriptor,
|
||||||
|
ForwardScanDirection);
|
||||||
|
|
||||||
while (HeapTupleIsValid(colocationTuple))
|
while (HeapTupleIsValid(colocationTuple))
|
||||||
{
|
{
|
||||||
|
@ -3474,10 +3481,11 @@ ColocationGroupCreateCommandList(void)
|
||||||
"NULL, NULL)");
|
"NULL, NULL)");
|
||||||
}
|
}
|
||||||
|
|
||||||
colocationTuple = systable_getnext(scanDescriptor);
|
colocationTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(colocationIdIndexRel, AccessShareLock);
|
||||||
table_close(pgDistColocation, AccessShareLock);
|
table_close(pgDistColocation, AccessShareLock);
|
||||||
|
|
||||||
if (!hasColocations)
|
if (!hasColocations)
|
||||||
|
|
|
@ -24,6 +24,7 @@ extern bool IsObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
|
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||||
|
|
|
@ -238,6 +238,7 @@ extern Oid DistShardLogicalRelidIndexId(void);
|
||||||
extern Oid DistShardShardidIndexId(void);
|
extern Oid DistShardShardidIndexId(void);
|
||||||
extern Oid DistPlacementShardidIndexId(void);
|
extern Oid DistPlacementShardidIndexId(void);
|
||||||
extern Oid DistPlacementPlacementidIndexId(void);
|
extern Oid DistPlacementPlacementidIndexId(void);
|
||||||
|
extern Oid DistColocationIndexId(void);
|
||||||
extern Oid DistTransactionRelationId(void);
|
extern Oid DistTransactionRelationId(void);
|
||||||
extern Oid DistTransactionGroupIndexId(void);
|
extern Oid DistTransactionGroupIndexId(void);
|
||||||
extern Oid DistPlacementGroupidIndexId(void);
|
extern Oid DistPlacementGroupidIndexId(void);
|
||||||
|
|
|
@ -9,24 +9,32 @@ NOTICE: Preparing to sync the metadata to all nodes
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- tables are objects with Citus 11+
|
-- tables, views and their dependencies become objects with Citus 11+
|
||||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass) ORDER BY 1;
|
||||||
pg_identify_object_as_address
|
pg_identify_object_as_address
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(function,"{post_11_upgrade,func_in_transaction_def}",{})
|
(function,"{post_11_upgrade,func_in_transaction_def}",{})
|
||||||
(schema,{post_11_upgrade},{})
|
(schema,{post_11_upgrade},{})
|
||||||
|
(table,"{post_11_upgrade,employees}",{})
|
||||||
(table,"{post_11_upgrade,part_table}",{})
|
(table,"{post_11_upgrade,part_table}",{})
|
||||||
(table,"{post_11_upgrade,sensors}",{})
|
(table,"{post_11_upgrade,sensors}",{})
|
||||||
("text search configuration","{post_11_upgrade,partial_index_test_config}",{})
|
("text search configuration","{post_11_upgrade,partial_index_test_config}",{})
|
||||||
(type,{post_11_upgrade.my_type},{})
|
(type,{post_11_upgrade.my_type},{})
|
||||||
(6 rows)
|
(type,{post_11_upgrade.my_type_for_view},{})
|
||||||
|
(view,"{post_11_upgrade,non_dist_upgrade_multiple_dist_view}",{})
|
||||||
|
(view,"{post_11_upgrade,non_dist_upgrade_ref_view}",{})
|
||||||
|
(view,"{post_11_upgrade,non_dist_upgrade_ref_view_2}",{})
|
||||||
|
(view,"{post_11_upgrade,reporting_line}",{})
|
||||||
|
(view,"{post_11_upgrade,view_for_upgrade_test}",{})
|
||||||
|
(view,"{post_11_upgrade,view_for_upgrade_test_my_type}",{})
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
-- on all nodes
|
-- on all nodes
|
||||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||||
run_command_on_workers
|
run_command_on_workers
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||||
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Create the necessary test utility function
|
-- Create the necessary test utility function
|
||||||
|
|
|
@ -114,6 +114,14 @@ INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||||
|
-- table for recursive view
|
||||||
|
CREATE TABLE employees (employee_id int, manager_id int, full_name text);
|
||||||
|
SELECT create_distributed_table('employees', 'employee_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SET citus.enable_ddl_propagation TO off;
|
SET citus.enable_ddl_propagation TO off;
|
||||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||||
SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$);
|
SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$);
|
||||||
|
@ -147,6 +155,57 @@ $$;');
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;
|
||||||
|
-- one normally would not need views on the workers pre-11, but still
|
||||||
|
-- nice test to have
|
||||||
|
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(localhost,57636,t,SET)
|
||||||
|
(localhost,57637,t,SET)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- a non-distributed type dependency to a view
|
||||||
|
-- both the view and the type should be distributed after the upgrade
|
||||||
|
CREATE TYPE post_11_upgrade.my_type_for_view AS (a int);
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors;
|
||||||
|
-- a local type, table and view, should not be distributed
|
||||||
|
-- after the upgrade
|
||||||
|
CREATE TYPE post_11_upgrade.local_type AS (a int);
|
||||||
|
CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type);
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view;
|
||||||
|
-- a local table joined with a distributed table. In other words, the view has a local table dependency
|
||||||
|
-- and should not be distributed after the upgrade
|
||||||
|
CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int);
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true);
|
||||||
|
-- a view selecting from multiple
|
||||||
|
-- distributed/reference tables should be marked as distributed
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true);
|
||||||
|
-- a view selecting from reference table should be fine
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table;
|
||||||
|
-- a view selecting from another (distributed) view should also be distributed
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view;
|
||||||
|
-- temporary views should not be marked as distributed
|
||||||
|
CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table;
|
||||||
|
CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table;
|
||||||
|
-- we should be able to distribute recursive views
|
||||||
|
CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS
|
||||||
|
SELECT employee_id,
|
||||||
|
full_name AS subordinates
|
||||||
|
FROM employees
|
||||||
|
WHERE manager_id IS NULL
|
||||||
|
UNION ALL
|
||||||
|
SELECT e.employee_id,
|
||||||
|
(rl.subordinates || ' > ' || e.full_name) AS subordinates
|
||||||
|
FROM employees e
|
||||||
|
INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id;
|
||||||
|
-- v_test_1 and v_test_2 becomes circularly dependend views
|
||||||
|
-- so we should not try to distribute any of the views
|
||||||
|
CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors;
|
||||||
|
CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors;
|
||||||
|
CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid);
|
||||||
|
CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid);
|
||||||
RESET citus.enable_ddl_propagation;
|
RESET citus.enable_ddl_propagation;
|
||||||
CREATE TABLE sensors_parser(
|
CREATE TABLE sensors_parser(
|
||||||
measureid integer,
|
measureid integer,
|
||||||
|
|
|
@ -4,11 +4,11 @@ SET search_path = post_11_upgrade;
|
||||||
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true);
|
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true);
|
||||||
SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false);
|
SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false);
|
||||||
|
|
||||||
-- tables are objects with Citus 11+
|
-- tables, views and their dependencies become objects with Citus 11+
|
||||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass) ORDER BY 1;
|
||||||
|
|
||||||
-- on all nodes
|
-- on all nodes
|
||||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||||
|
|
||||||
-- Create the necessary test utility function
|
-- Create the necessary test utility function
|
||||||
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
||||||
|
|
|
@ -104,6 +104,9 @@ INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||||
|
|
||||||
|
-- table for recursive view
|
||||||
|
CREATE TABLE employees (employee_id int, manager_id int, full_name text);
|
||||||
|
SELECT create_distributed_table('employees', 'employee_id');
|
||||||
|
|
||||||
SET citus.enable_ddl_propagation TO off;
|
SET citus.enable_ddl_propagation TO off;
|
||||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||||
|
@ -129,6 +132,62 @@ END;
|
||||||
$$;');
|
$$;');
|
||||||
|
|
||||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;
|
||||||
|
|
||||||
|
-- one normally would not need views on the workers pre-11, but still
|
||||||
|
-- nice test to have
|
||||||
|
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test AS SELECT * FROM sensors;');
|
||||||
|
|
||||||
|
|
||||||
|
-- a non-distributed type dependency to a view
|
||||||
|
-- both the view and the type should be distributed after the upgrade
|
||||||
|
CREATE TYPE post_11_upgrade.my_type_for_view AS (a int);
|
||||||
|
CREATE VIEW post_11_upgrade.view_for_upgrade_test_my_type (casted) AS SELECT row(measureid)::post_11_upgrade.my_type_for_view FROM sensors;
|
||||||
|
|
||||||
|
-- a local type, table and view, should not be distributed
|
||||||
|
-- after the upgrade
|
||||||
|
CREATE TYPE post_11_upgrade.local_type AS (a int);
|
||||||
|
CREATE TABLE post_11_upgrade.non_dist_table_for_view(a int, b post_11_upgrade.local_type);
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view AS SELECT * FROM non_dist_table_for_view;
|
||||||
|
|
||||||
|
-- a local table joined with a distributed table. In other words, the view has a local table dependency
|
||||||
|
-- and should not be distributed after the upgrade
|
||||||
|
CREATE TABLE post_11_upgrade.non_dist_dist_table_for_view(a int);
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_test_view_local_join AS SELECT * FROM non_dist_table_for_view JOIN sensors ON (true);
|
||||||
|
|
||||||
|
-- a view selecting from multiple
|
||||||
|
-- distributed/reference tables should be marked as distributed
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_multiple_dist_view AS SELECT colocated_dist_table.* FROM colocated_dist_table JOIN sensors ON (true) JOIN reference_table ON (true);
|
||||||
|
|
||||||
|
-- a view selecting from reference table should be fine
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view AS SELECT * FROM reference_table;
|
||||||
|
|
||||||
|
-- a view selecting from another (distributed) view should also be distributed
|
||||||
|
CREATE VIEW post_11_upgrade.non_dist_upgrade_ref_view_2 AS SELECT * FROM non_dist_upgrade_ref_view;
|
||||||
|
|
||||||
|
-- temporary views should not be marked as distributed
|
||||||
|
CREATE VIEW pg_temp.temp_view_1 AS SELECT * FROM reference_table;
|
||||||
|
CREATE temporary VIEW temp_view_2 AS SELECT * FROM reference_table;
|
||||||
|
|
||||||
|
-- we should be able to distribute recursive views
|
||||||
|
CREATE OR REPLACE RECURSIVE VIEW reporting_line (employee_id, subordinates) AS
|
||||||
|
SELECT employee_id,
|
||||||
|
full_name AS subordinates
|
||||||
|
FROM employees
|
||||||
|
WHERE manager_id IS NULL
|
||||||
|
UNION ALL
|
||||||
|
SELECT e.employee_id,
|
||||||
|
(rl.subordinates || ' > ' || e.full_name) AS subordinates
|
||||||
|
FROM employees e
|
||||||
|
INNER JOIN reporting_line rl ON e.manager_id = rl.employee_id;
|
||||||
|
|
||||||
|
-- v_test_1 and v_test_2 becomes circularly dependend views
|
||||||
|
-- so we should not try to distribute any of the views
|
||||||
|
CREATE VIEW post_11_upgrade.v_test_1 AS SELECT * FROM sensors;
|
||||||
|
CREATE VIEW post_11_upgrade.v_test_2 AS SELECT * FROM sensors;
|
||||||
|
CREATE OR REPLACE VIEW post_11_upgrade.v_test_1 AS SELECT sensors.* FROM sensors JOIN v_test_2 USING (measureid);
|
||||||
|
CREATE OR REPLACE VIEW post_11_upgrade.v_test_2 AS SELECT sensors.* FROM sensors JOIN v_test_1 USING (measureid);
|
||||||
|
|
||||||
RESET citus.enable_ddl_propagation;
|
RESET citus.enable_ddl_propagation;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue