mirror of https://github.com/citusdata/citus.git
Propagate views while scaling out the cluster
parent
96d91a48a4
commit
e55417a04f
|
@ -349,6 +349,11 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
|||
return DDLCommandsForSequence(dependency->objectId, sequenceOwnerName);
|
||||
}
|
||||
|
||||
if (relKind == RELKIND_VIEW)
|
||||
{
|
||||
return CreateViewDDLCommandsIdempotent(dependency);
|
||||
}
|
||||
|
||||
/* if this relation is not supported, break to the error at the end */
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -11,9 +11,11 @@
|
|||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "catalog/objectaddress.h"
|
||||
#include "commands/extension.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/errormessage.h"
|
||||
|
@ -29,10 +31,13 @@
|
|||
#include "nodes/pg_list.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static List * FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok);
|
||||
static void AppendAliasesToCreateViewCommandForExistingView(StringInfo createViewCommand,
|
||||
Oid viewOid);
|
||||
|
||||
/*
|
||||
* PreprocessViewStmt is called during the planning phase for CREATE OR REPLACE VIEW
|
||||
|
@ -233,3 +238,105 @@ FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok)
|
|||
}
|
||||
return distributedViewNames;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateViewDDLCommandsIdempotent returns a list of DDL statements (const char *) to be
|
||||
* executed on a node to recreate the view addressed by the viewAddress.
|
||||
*/
|
||||
List *
|
||||
CreateViewDDLCommandsIdempotent(const ObjectAddress *viewAddress)
|
||||
{
|
||||
StringInfo createViewCommand = makeStringInfo();
|
||||
|
||||
Oid viewOid = viewAddress->objectId;
|
||||
|
||||
char *viewName = get_rel_name(viewOid);
|
||||
Oid schemaOid = get_rel_namespace(viewOid);
|
||||
char *schemaName = get_namespace_name(schemaOid);
|
||||
|
||||
appendStringInfoString(createViewCommand, "CREATE OR REPLACE VIEW ");
|
||||
char *qualifiedViewName = NameListToQuotedString(list_make2(makeString(schemaName),
|
||||
makeString(viewName)));
|
||||
|
||||
appendStringInfo(createViewCommand, "%s ", qualifiedViewName);
|
||||
|
||||
/* Add column aliases to create view command */
|
||||
AppendAliasesToCreateViewCommandForExistingView(createViewCommand, viewOid);
|
||||
|
||||
/* Add rel options to create view command */
|
||||
char *relOptions = flatten_reloptions(viewOid);
|
||||
if (relOptions != NULL)
|
||||
{
|
||||
appendStringInfo(createViewCommand, "WITH (%s) ", relOptions);
|
||||
pfree(relOptions);
|
||||
}
|
||||
|
||||
/* Add view definition to create view command */
|
||||
AddViewDefinitionToCreateViewCommand(createViewCommand, viewOid);
|
||||
|
||||
/* Add alter owner commmand */
|
||||
StringInfo alterOwnerCommand = makeStringInfo();
|
||||
char *viewOwnerName = TableOwner(viewOid);
|
||||
appendStringInfo(alterOwnerCommand,
|
||||
"ALTER VIEW %s OWNER TO %s", qualifiedViewName,
|
||||
quote_identifier(viewOwnerName));
|
||||
|
||||
return list_make2(createViewCommand->data,
|
||||
alterOwnerCommand->data);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendAliasesToCreateViewCommandForExistingView appends aliases to the create view
|
||||
* command for the existing view.
|
||||
*/
|
||||
static void
|
||||
AppendAliasesToCreateViewCommandForExistingView(StringInfo createViewCommand, Oid viewOid)
|
||||
{
|
||||
/* Get column name aliases from pg_attribute */
|
||||
ScanKeyData key[1];
|
||||
ScanKeyInit(&key[0],
|
||||
Anum_pg_attribute_attrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(viewOid));
|
||||
|
||||
/* TODO: Check the lock */
|
||||
Relation maprel = table_open(AttributeRelationId, AccessShareLock);
|
||||
Relation mapidx = index_open(AttributeRelidNumIndexId, AccessShareLock);
|
||||
SysScanDesc pgAttributeScan = systable_beginscan_ordered(maprel, mapidx, NULL, 1,
|
||||
key);
|
||||
|
||||
bool isInitialAlias = true;
|
||||
bool hasAlias = false;
|
||||
HeapTuple attributeTuple;
|
||||
while (HeapTupleIsValid(attributeTuple = systable_getnext_ordered(pgAttributeScan,
|
||||
ForwardScanDirection)))
|
||||
{
|
||||
Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
|
||||
char *aliasName = NameStr(att->attname);
|
||||
|
||||
if (isInitialAlias)
|
||||
{
|
||||
appendStringInfoString(createViewCommand, "(");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfoString(createViewCommand, ",");
|
||||
}
|
||||
|
||||
appendStringInfoString(createViewCommand, aliasName);
|
||||
|
||||
hasAlias = true;
|
||||
isInitialAlias = false;
|
||||
}
|
||||
|
||||
if (hasAlias)
|
||||
{
|
||||
appendStringInfoString(createViewCommand, ") ");
|
||||
}
|
||||
|
||||
systable_endscan_ordered(pgAttributeScan);
|
||||
index_close(mapidx, AccessShareLock);
|
||||
table_close(maprel, AccessShareLock);
|
||||
}
|
||||
|
|
|
@ -80,7 +80,6 @@ static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
|
|||
static void AppendStorageParametersToString(StringInfo stringBuffer,
|
||||
List *optionList);
|
||||
static void simple_quote_literal(StringInfo buf, const char *val);
|
||||
static char * flatten_reloptions(Oid relid);
|
||||
static void AddVacuumParams(ReindexStmt *reindexStmt, StringInfo buffer);
|
||||
|
||||
|
||||
|
@ -1231,7 +1230,7 @@ pg_get_replica_identity_command(Oid tableRelationId)
|
|||
* This function comes from PostgreSQL source code in
|
||||
* src/backend/utils/adt/ruleutils.c
|
||||
*/
|
||||
static char *
|
||||
char *
|
||||
flatten_reloptions(Oid relid)
|
||||
{
|
||||
char *result = NULL;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include "commands/defrem.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
|
@ -24,7 +25,6 @@
|
|||
static void AddQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid);
|
||||
static void AddAliasesToCreateViewCommand(StringInfo buf, ViewStmt *stmt);
|
||||
static void AddOptionsToCreateViewCommand(StringInfo buf, ViewStmt *stmt);
|
||||
static void AddViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid);
|
||||
static void AppendDropViewStmt(StringInfo buf, DropStmt *stmt);
|
||||
static void AppendViewNameList(StringInfo buf, List *objects);
|
||||
|
||||
|
@ -54,13 +54,18 @@ DeparseViewStmt(Node *node)
|
|||
appendStringInfoString(viewString, "TEMPORARY ");
|
||||
}
|
||||
|
||||
/* Skip recursive views for now */
|
||||
|
||||
appendStringInfo(viewString, "VIEW ");
|
||||
|
||||
AddQualifiedViewNameToCreateViewCommand(viewString, viewOid);
|
||||
AddAliasesToCreateViewCommand(viewString, stmt);
|
||||
AddOptionsToCreateViewCommand(viewString, stmt);
|
||||
|
||||
/*
|
||||
* Note that Postgres converts CREATE RECURSIVE VIEW commands to
|
||||
* CREATE VIEW ... WITH RECURSIVE and pg_get_viewdef return it properly.
|
||||
* So, we don't need to RECURSIVE views separately while obtaining the
|
||||
* view creation command
|
||||
*/
|
||||
AddViewDefinitionToCreateViewCommand(viewString, viewOid);
|
||||
|
||||
return viewString->data;
|
||||
|
@ -159,7 +164,7 @@ AddOptionsToCreateViewCommand(StringInfo buf, ViewStmt *stmt)
|
|||
* AddViewDefinitionToCreateViewCommand adds the definition of the given view to the
|
||||
* given create view command.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
AddViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid)
|
||||
{
|
||||
/*
|
||||
|
|
|
@ -1330,20 +1330,19 @@ GetRelationRuleReferenceDependencyList(Oid relationId)
|
|||
/* Expand results with the noninternal dependencies of it */
|
||||
List *ruleDependencies = DependencyDefinitionFromPgDepend(ruleAddress);
|
||||
|
||||
DependencyDefinition *dependencyDefinition = NULL;
|
||||
DependencyDefinition *dependencyDef = NULL;
|
||||
foreach_ptr(dependencyDefinition, ruleDependencies)
|
||||
{
|
||||
/* Do not add internal dependencies and relation itself */
|
||||
if (dependencyDefinition->data.pg_depend.deptype == DEPENDENCY_INTERNAL ||
|
||||
(dependencyDefinition->data.pg_depend.refclassid ==
|
||||
RelationRelationId &&
|
||||
dependencyDefinition->data.pg_depend.refobjid == relationId))
|
||||
if (dependencyDef->data.pg_depend.deptype == DEPENDENCY_INTERNAL ||
|
||||
(dependencyDef->data.pg_depend.refclassid == RelationRelationId &&
|
||||
dependencyDef->data.pg_depend.refobjid == relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
nonInternalRuleDependencies = lappend(nonInternalRuleDependencies,
|
||||
dependencyDefinition);
|
||||
dependencyDef);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
|
|||
extern bool contain_nextval_expression_walker(Node *node, void *context);
|
||||
extern char * pg_get_replica_identity_command(Oid tableRelationId);
|
||||
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
|
||||
extern char * flatten_reloptions(Oid relid);
|
||||
|
||||
/* Function declarations for version dependent PostgreSQL ruleutils functions */
|
||||
extern void pg_get_query_def(Query *query, StringInfo buffer);
|
||||
|
|
|
@ -631,6 +631,7 @@ extern List * PostprocessViewStmt(Node *node, const char *queryString);
|
|||
extern ObjectAddress ViewStmtObjectAddress(Node *node, bool missing_ok);
|
||||
extern List * PreprocessDropViewStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * CreateViewDDLCommandsIdempotent(const ObjectAddress *viewAddress);
|
||||
extern char * DeparseViewStmt(Node *node);
|
||||
extern char * DeparseDropViewStmt(Node *node);
|
||||
|
||||
|
|
|
@ -122,6 +122,7 @@ extern void QualifyAlterTypeSchemaStmt(Node *stmt);
|
|||
extern void QualifyAlterTypeOwnerStmt(Node *stmt);
|
||||
|
||||
extern void QualifyDropViewStmt(Node *node);
|
||||
extern void AddViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid);
|
||||
|
||||
extern ObjectAddress GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok);
|
||||
extern ObjectAddress RenameAttributeStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||
|
|
Loading…
Reference in New Issue