mirror of https://github.com/citusdata/citus.git
Propagate views upon distribution
parent
86781ec85a
commit
c322f4d1aa
|
@ -39,7 +39,6 @@ static List * FilterDistributedExtensions(List *extensionObjectList);
|
||||||
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
|
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
|
||||||
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
|
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
|
||||||
static List * GetAllViews(void);
|
static List * GetAllViews(void);
|
||||||
static bool ShouldMarkRelationDistributedOnUpgrade(Oid relationId);
|
|
||||||
static bool ShouldPropagateExtensionCommand(Node *parseTree);
|
static bool ShouldPropagateExtensionCommand(Node *parseTree);
|
||||||
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
|
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
|
||||||
static Node * RecreateExtensionStmt(Oid extensionOid);
|
static Node * RecreateExtensionStmt(Oid extensionOid);
|
||||||
|
@ -659,7 +658,7 @@ GetAllViews(void)
|
||||||
* decides whether the input relation should be marked as distributed
|
* decides whether the input relation should be marked as distributed
|
||||||
* during the upgrade.
|
* during the upgrade.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
ShouldMarkRelationDistributedOnUpgrade(Oid relationId)
|
ShouldMarkRelationDistributedOnUpgrade(Oid relationId)
|
||||||
{
|
{
|
||||||
if (!EnableMetadataSync)
|
if (!EnableMetadataSync)
|
||||||
|
|
|
@ -50,6 +50,7 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/metadata_utility.h"
|
#include "distributed/metadata_utility.h"
|
||||||
|
#include "distributed/metadata/dependency.h"
|
||||||
#include "distributed/metadata/distobject.h"
|
#include "distributed/metadata/distobject.h"
|
||||||
#include "distributed/metadata/pg_dist_object.h"
|
#include "distributed/metadata/pg_dist_object.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
@ -317,6 +318,38 @@ SyncCitusTableMetadata(Oid relationId)
|
||||||
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
||||||
MarkObjectDistributed(&relationAddress);
|
MarkObjectDistributed(&relationAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CreateDependentViewsOnWorkers(relationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateDependentViewsOnWorkers takes a relationId and creates the views that depend on
|
||||||
|
* that relation on workers with metadata.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CreateDependentViewsOnWorkers(Oid relationId)
|
||||||
|
{
|
||||||
|
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
List *views = GetDependingViews(relationId);
|
||||||
|
|
||||||
|
Oid viewOid = InvalidOid;
|
||||||
|
foreach_oid(viewOid, views)
|
||||||
|
{
|
||||||
|
if (!ShouldMarkRelationDistributedOnUpgrade(viewOid))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *createViewCommand = CreateViewDDLCommand(viewOid);
|
||||||
|
char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid);
|
||||||
|
|
||||||
|
SendCommandToWorkersWithMetadata(createViewCommand);
|
||||||
|
SendCommandToWorkersWithMetadata(alterViewOwnerCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -181,6 +181,7 @@ extern Oid get_constraint_typid(Oid conoid);
|
||||||
/* extension.c - forward declarations */
|
/* extension.c - forward declarations */
|
||||||
extern bool IsDropCitusExtensionStmt(Node *parsetree);
|
extern bool IsDropCitusExtensionStmt(Node *parsetree);
|
||||||
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
|
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
|
||||||
|
extern bool ShouldMarkRelationDistributedOnUpgrade(Oid relationId);
|
||||||
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
|
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
|
||||||
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);
|
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);
|
||||||
extern List * PreprocessDropExtensionStmt(Node *stmt, const char *queryString,
|
extern List * PreprocessDropExtensionStmt(Node *stmt, const char *queryString,
|
||||||
|
|
|
@ -32,6 +32,7 @@ typedef enum
|
||||||
/* Functions declarations for metadata syncing */
|
/* Functions declarations for metadata syncing */
|
||||||
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort);
|
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort);
|
||||||
extern void SyncCitusTableMetadata(Oid relationId);
|
extern void SyncCitusTableMetadata(Oid relationId);
|
||||||
|
extern void CreateDependentViewsOnWorkers(Oid relationId);
|
||||||
extern void EnsureSequentialModeMetadataOperations(void);
|
extern void EnsureSequentialModeMetadataOperations(void);
|
||||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||||
extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
||||||
|
|
|
@ -570,11 +570,6 @@ BEGIN;
|
||||||
SET LOCAL citus.force_max_query_parallelization TO ON;
|
SET LOCAL citus.force_max_query_parallelization TO ON;
|
||||||
CREATE TABLE table_1_to_view_in_transaction(a int);
|
CREATE TABLE table_1_to_view_in_transaction(a int);
|
||||||
SELECT create_distributed_table('table_1_to_view_in_transaction', 'a');
|
SELECT create_distributed_table('table_1_to_view_in_transaction', 'a');
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
CREATE TABLE table_2_to_view_in_transaction(a int);
|
CREATE TABLE table_2_to_view_in_transaction(a int);
|
||||||
SELECT create_distributed_table('table_2_to_view_in_transaction', 'a');
|
SELECT create_distributed_table('table_2_to_view_in_transaction', 'a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
|
@ -593,6 +588,43 @@ ERROR: cannot run view command because there was a parallel operation on a dist
|
||||||
DETAIL: When running command on/for a distributed view, Citus needs to perform all operations over a single connection per node to ensure consistency.
|
DETAIL: When running command on/for a distributed view, Citus needs to perform all operations over a single connection per node to ensure consistency.
|
||||||
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
|
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- verify that the views get distributed after the table is distributed
|
||||||
|
create table table_to_depend_on_1 (a int);
|
||||||
|
create table table_to_depend_on_2 (a int);
|
||||||
|
-- the first view depends on a table
|
||||||
|
create view dependent_view_1 as select count(*) from table_to_depend_on_1;
|
||||||
|
WARNING: "view dependent_view_1" has dependency to "table table_to_depend_on_1" that is not in Citus' metadata
|
||||||
|
DETAIL: "view dependent_view_1" will be created only locally
|
||||||
|
HINT: Distribute "table table_to_depend_on_1" first to distribute "view dependent_view_1"
|
||||||
|
-- the seconds view depends on two tables
|
||||||
|
create view dependent_view_2 as select count(*) from table_to_depend_on_1 join table_to_depend_on_2 on table_to_depend_on_1.a=table_to_depend_on_2.a;
|
||||||
|
WARNING: "view dependent_view_2" has dependency to "table table_to_depend_on_2" that is not in Citus' metadata
|
||||||
|
DETAIL: "view dependent_view_2" will be created only locally
|
||||||
|
HINT: Distribute "table table_to_depend_on_2" first to distribute "view dependent_view_2"
|
||||||
|
-- distribute only one table
|
||||||
|
select create_distributed_table('table_to_depend_on_1','a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- see two views on the coordinator
|
||||||
|
select viewname from pg_views where viewname like 'dependent_view__';
|
||||||
|
viewname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dependent_view_1
|
||||||
|
dependent_view_2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
-- see one view on the worker
|
||||||
|
select viewname from pg_views where viewname like 'dependent_view__';
|
||||||
|
viewname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dependent_view_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA view_prop_schema_inner CASCADE;
|
DROP SCHEMA view_prop_schema_inner CASCADE;
|
||||||
DROP SCHEMA view_prop_schema CASCADE;
|
DROP SCHEMA view_prop_schema CASCADE;
|
||||||
|
|
|
@ -374,6 +374,23 @@ BEGIN;
|
||||||
ALTER TABLE view_prop_schema_inner.view_in_transaction RENAME COLUMN a TO b;
|
ALTER TABLE view_prop_schema_inner.view_in_transaction RENAME COLUMN a TO b;
|
||||||
DROP VIEW view_prop_schema_inner.view_in_transaction;
|
DROP VIEW view_prop_schema_inner.view_in_transaction;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- verify that the views get distributed after the table is distributed
|
||||||
|
create table table_to_depend_on_1 (a int);
|
||||||
|
create table table_to_depend_on_2 (a int);
|
||||||
|
-- the first view depends on a table
|
||||||
|
create view dependent_view_1 as select count(*) from table_to_depend_on_1;
|
||||||
|
-- the seconds view depends on two tables
|
||||||
|
create view dependent_view_2 as select count(*) from table_to_depend_on_1 join table_to_depend_on_2 on table_to_depend_on_1.a=table_to_depend_on_2.a;
|
||||||
|
-- distribute only one table
|
||||||
|
select create_distributed_table('table_to_depend_on_1','a');
|
||||||
|
|
||||||
|
-- see two views on the coordinator
|
||||||
|
select viewname from pg_views where viewname like 'dependent_view__';
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
-- see one view on the worker
|
||||||
|
select viewname from pg_views where viewname like 'dependent_view__';
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA view_prop_schema_inner CASCADE;
|
DROP SCHEMA view_prop_schema_inner CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue