Introduce CREATE/DROP VIEW

velioglu/vpp
Burak Velioglu 2022-04-06 23:12:16 +03:00
parent f9bbcb8840
commit 96d91a48a4
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
9 changed files with 653 additions and 3 deletions

View File

@ -177,6 +177,14 @@ static DistributeObjectOps Any_CreateFunction = {
.address = CreateFunctionStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_View = {
.deparse = DeparseViewStmt,
.qualify = NULL,
.preprocess = PreprocessViewStmt,
.postprocess = PostprocessViewStmt,
.address = ViewStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_CreatePolicy = {
.deparse = NULL,
.qualify = NULL,
@ -385,6 +393,14 @@ static DistributeObjectOps Function_Drop = {
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps View_Drop = {
.deparse = DeparseDropViewStmt,
.qualify = QualifyDropViewStmt,
.preprocess = PreprocessDropViewStmt,
.postprocess = NULL,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Function_Rename = {
.deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt,
@ -1285,6 +1301,11 @@ GetDistributeObjectOps(Node *node)
return &Trigger_Drop;
}
case OBJECT_VIEW:
{
return &View_Drop;
}
default:
{
return &NoDistributeOps;
@ -1314,6 +1335,11 @@ GetDistributeObjectOps(Node *node)
return &Any_Index;
}
case T_ViewStmt:
{
return &Any_View;
}
case T_ReindexStmt:
{
return &Any_Reindex;

View File

@ -0,0 +1,235 @@
/*-------------------------------------------------------------------------
*
* view.c
* Commands for distributing CREATE OR REPLACE VIEW statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "catalog/objectaddress.h"
#include "commands/extension.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/errormessage.h"
#include "distributed/listutils.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/namespace_utils.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h"
#include "nodes/nodes.h"
#include "nodes/pg_list.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static List * FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok);
/*
* PreprocessViewStmt is called during the planning phase for CREATE OR REPLACE VIEW
* before it is created on the local node internally.
*
* We do our basic housekeeping where we make sure we are on the coordinator and
* qualify the given statement.
*/
List *
PreprocessViewStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
return NIL;
}
/*
* PostprocessViewStmt actually creates the plan we need to execute for view propagation.
*
* If view depends on any undistributable object, Citus can not distribute it. In order to
* not to prevent users from creating local views on the coordinator WARNING message will
* be sent to the customer about the case instead of erroring out.
*
* Besides creating the plan we also make sure all (new) dependencies of the view are
* created on all nodes.
*/
List *
PostprocessViewStmt(Node *node, const char *queryString)
{
ViewStmt *stmt = castNode(ViewStmt, node);
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (IsObjectAddressOwnedByExtension(&viewAddress, NULL))
{
return NIL;
}
/* If the view has any unsupported dependency, create it locally */
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&viewAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureSequentialMode(OBJECT_VIEW);
EnsureDependenciesExistOnAllNodes(&viewAddress);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* ViewStmtObjectAddress returns the ObjectAddress for the subject of the
* CREATE [OR REPLACE] VIEW statement. If missing_ok is false it will error with the
* normal postgres error for unfound views.
*/
ObjectAddress
ViewStmtObjectAddress(Node *node, bool missing_ok)
{
ViewStmt *stmt = castNode(ViewStmt, node);
Oid schemaOid = RangeVarGetCreationNamespace(stmt->view);
Oid viewOid = get_relname_relid(stmt->view->relname, schemaOid);
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
return viewAddress;
}
/*
* PreprocessDropViewStmt gets called during the planning phase of a DROP VIEW statement
* and returns a list of DDLJob's that will drop any distributed view from the
* workers.
*
* The DropStmt could have multiple objects to drop, the list of objects will be filtered
* to only keep the distributed views for deletion on the workers. Non-distributed
* views will still be dropped locally but not on the workers.
*/
List *
PreprocessDropViewStmt(Node *node, const char *queryString, ProcessUtilityContext
processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
List *distributedViews = NIL;
if (!ShouldPropagate())
{
return NIL;
}
/*
* Our statements need to be fully qualified so we can drop them from the right schema
* on the workers
*/
QualifyTreeNode((Node *) stmt);
distributedViews = FilterNameListForDistributedViews(stmt->objects, stmt->missing_ok);
if (list_length(distributedViews) < 1)
{
/* no distributed view to drop */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_VIEW);
/*
* Swap the list of objects before deparsing and restore the old list after. This
* ensures we only have distributed views in the deparsed drop statement.
*/
DropStmt *stmtCopy = copyObject(stmt);
stmtCopy->objects = distributedViews;
const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* FilterNameListForDistributedViews takes a list of view names and filters against the
* views that are distributed. Given list of view names must be qualified before.
*
* The original list will not be touched, a new list will be created with only the objects
* in there.
*/
static List *
FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok)
{
List *distributedViewNames = NIL;
List *qualifiedViewName = NULL;
foreach_ptr(qualifiedViewName, viewNamesList)
{
/*
* Name of the view must be qualified before calling this function
*/
Assert(list_length(qualifiedViewName) == 2);
char *schemaName = strVal(linitial(qualifiedViewName));
char *viewName = strVal(lsecond(qualifiedViewName));
Oid schemaId = get_namespace_oid(schemaName, missing_ok);
Oid viewOid = get_relname_relid(viewName, schemaId);
if (!OidIsValid(viewOid))
{
continue;
}
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
if (IsObjectDistributed(&viewAddress))
{
distributedViewNames = lappend(distributedViewNames, qualifiedViewName);
}
}
return distributedViewNames;
}

View File

@ -0,0 +1,257 @@
/*-------------------------------------------------------------------------
*
* deparse_view_stmts.c
*
* All routines to deparse view statements.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
static void AddQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid);
static void AddAliasesToCreateViewCommand(StringInfo buf, ViewStmt *stmt);
static void AddOptionsToCreateViewCommand(StringInfo buf, ViewStmt *stmt);
static void AddViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid);
static void AppendDropViewStmt(StringInfo buf, DropStmt *stmt);
static void AppendViewNameList(StringInfo buf, List *objects);
/*
* DeparseViewStmt deparses the given CREATE OR REPLACE VIEW statement
*/
char *
DeparseViewStmt(Node *node)
{
ViewStmt *stmt = castNode(ViewStmt, node);
StringInfo viewString = makeStringInfo();
Oid schemaOid = RangeVarGetCreationNamespace(stmt->view);
Oid viewOid = get_relname_relid(stmt->view->relname, schemaOid);
if (stmt->replace)
{
appendStringInfoString(viewString, "CREATE OR REPLACE ");
}
else
{
appendStringInfoString(viewString, "CREATE ");
}
if (stmt->view->relpersistence == RELPERSISTENCE_TEMP)
{
appendStringInfoString(viewString, "TEMPORARY ");
}
/* Skip recursive views for now */
appendStringInfo(viewString, "VIEW ");
AddQualifiedViewNameToCreateViewCommand(viewString, viewOid);
AddAliasesToCreateViewCommand(viewString, stmt);
AddOptionsToCreateViewCommand(viewString, stmt);
AddViewDefinitionToCreateViewCommand(viewString, viewOid);
return viewString->data;
}
/*
* AddQualifiedViewNameToCreateViewCommand adds the qualified view of the given view
* statement to the given create view command.
*/
static void
AddQualifiedViewNameToCreateViewCommand(StringInfo buf, Oid viewOid)
{
char *viewName = get_rel_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
appendStringInfo(buf, "%s ", qualifiedViewName);
}
/*
* AddAliasesToCreateViewCommand appends aliases (if exists) of the given view statement
* to the given create view command.
*/
static void
AddAliasesToCreateViewCommand(StringInfo buf, ViewStmt *stmt)
{
if (stmt->aliases == NIL)
{
return;
}
bool isFirstAlias = true;
ListCell *aliasItem;
foreach(aliasItem, stmt->aliases)
{
char *columnAliasName = pstrdup(strVal(lfirst(aliasItem)));
if (isFirstAlias)
{
appendStringInfoString(buf, "(");
isFirstAlias = false;
}
else
{
appendStringInfoString(buf, ",");
}
appendStringInfoString(buf, columnAliasName);
}
appendStringInfoString(buf, ") ");
}
/*
* AddOptionsToCreateViewCommand appends options (if exists) of the given view statement
* to the given create view command. Note that this function also handles
* WITH [CASCADED | LOCAL] CHECK OPTION part of the CREATE VIEW command as well.
*/
static void
AddOptionsToCreateViewCommand(StringInfo buf, ViewStmt *stmt)
{
if (list_length(stmt->options) == 0)
{
return;
}
bool isFirstOption = true;
ListCell *optionCell;
foreach(optionCell, stmt->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
if (isFirstOption)
{
appendStringInfoString(buf, "WITH (");
isFirstOption = false;
}
else
{
appendStringInfoString(buf, ",");
}
appendStringInfoString(buf, option->defname);
appendStringInfoString(buf, " = ");
appendStringInfoString(buf, defGetString(option));
}
appendStringInfoString(buf, ") ");
}
/*
* AddViewDefinitionToCreateViewCommand adds the definition of the given view to the
* given create view command.
*/
static void
AddViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid)
{
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
/*
* Push the transaction snapshot to be able to get vief definition with pg_get_viewdef
*/
PushActiveSnapshot(GetTransactionSnapshot());
Datum viewDefinitionDatum = DirectFunctionCall1(pg_get_viewdef,
ObjectIdGetDatum(viewOid));
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
PopActiveSnapshot();
PopOverrideSearchPath();
appendStringInfo(buf, "AS %s ", viewDefinition);
}
/*
* DeparseDropViewStmt deparses the given DROP VIEW statement.
*/
char *
DeparseDropViewStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->removeType == OBJECT_VIEW);
AppendDropViewStmt(&str, stmt);
return str.data;
}
/*
* AppendDropViewStmt appends the deparsed representation of given drop stmt
* to the given string info buffer.
*/
static void
AppendDropViewStmt(StringInfo buf, DropStmt *stmt)
{
/*
* already tested at call site, but for future it might be collapsed in a
* DeparseDropStmt so be safe and check again
*/
Assert(stmt->removeType == OBJECT_VIEW);
appendStringInfo(buf, "DROP VIEW ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
AppendViewNameList(buf, stmt->objects);
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
appendStringInfoString(buf, ";");
}
/*
* AppendViewNameList appends the qualified view names by constructing them from the given
* objects list to the given string info buffer. Note that, objects must hold schema
* qualified view names as its' members.
*/
static void
AppendViewNameList(StringInfo buf, List *viewNamesList)
{
bool isFirstView = true;
List *qualifiedViewName = NULL;
foreach_ptr(qualifiedViewName, viewNamesList)
{
char *quotedQualifiedVieName = NameListToQuotedString(qualifiedViewName);
if (!isFirstView)
{
appendStringInfo(buf, ", ");
}
appendStringInfoString(buf, quotedQualifiedVieName);
isFirstView = false;
}
}

View File

@ -0,0 +1,52 @@
/*-------------------------------------------------------------------------
*
* qualify_view_stmt.c
* Functions specialized in fully qualifying all view statements. These
* functions are dispatched from qualify.c
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "nodes/nodes.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
/*
* QualifyDropViewStmt quailifies the view names of the DROP VIEW statement.
*/
void
QualifyDropViewStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
List *qualifiedViewNames = NIL;
List *viewName = NULL;
foreach_ptr(viewName, stmt->objects)
{
/*
* If the view name is not qualified, qualify it. Else use it directly
*/
if (list_length(viewName) == 1)
{
char *objname = NULL;
Oid schemaOid = QualifiedNameGetCreationNamespace(viewName, &objname);
char *schemaName = get_namespace_name(schemaOid);
List *qualifiedViewName = list_make2(makeString(schemaName),
linitial(viewName));
qualifiedViewNames = lappend(qualifiedViewNames, qualifiedViewName);
}
else
{
qualifiedViewNames = lappend(qualifiedViewNames, viewName);
}
}
stmt->objects = qualifiedViewNames;
}

View File

@ -798,6 +798,11 @@ GetObjectTypeString(ObjectType objType)
return "type";
}
case OBJECT_VIEW:
{
return "view";
}
default:
{
ereport(DEBUG1, (errmsg("unsupported object type"),

View File

@ -162,6 +162,7 @@ static bool FollowAllDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition);
static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
DependencyDefinition *definition);
static List * GetRelationRuleReferenceDependencyList(Oid relationId);
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
ObjectAddress target);
static ViewDependencyNode * BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap);
@ -422,7 +423,7 @@ DependencyDefinitionFromPgDepend(ObjectAddress target)
/*
* DependencyDefinitionFromPgDepend loads all pg_shdepend records describing the
* DependencyDefinitionFromPgShDepend loads all pg_shdepend records describing the
* dependencies of target.
*/
static List *
@ -734,7 +735,8 @@ SupportedDependencyByCitus(const ObjectAddress *address)
relKind == RELKIND_FOREIGN_TABLE ||
relKind == RELKIND_SEQUENCE ||
relKind == RELKIND_INDEX ||
relKind == RELKIND_PARTITIONED_INDEX)
relKind == RELKIND_PARTITIONED_INDEX ||
relKind == RELKIND_VIEW)
{
return true;
}
@ -1275,9 +1277,18 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
* create all objects required by the indices before we create the table
* including indices.
*/
List *indexDependencyList = GetRelationIndicesDependencyList(relationId);
result = list_concat(result, indexDependencyList);
/*
* Get the dependencies of the rule for the given relation. PG keeps internal
* dependency between relation and rule. As it is stated on the PG doc, if
* there is an internal dependency, NORMAL and AUTO dependencies of the
* dependent object behave much like they were dependencies of the referenced
* object.
*/
List *ruleRefDepList = GetRelationRuleReferenceDependencyList(relationId);
result = list_concat(result, ruleRefDepList);
}
default:
@ -1290,6 +1301,57 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
}
/*
* GetRelationRuleReferenceDependencyList returns the dependencies of the relation's
* internal rule dependencies.
*/
static List *
GetRelationRuleReferenceDependencyList(Oid relationId)
{
List *dependencyTupleList = GetPgDependTuplesForDependingObjects(RelationRelationId,
relationId);
List *nonInternalRuleDependencies = NIL;
HeapTuple depTup = NULL;
foreach_ptr(depTup, dependencyTupleList)
{
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
/*
* Dependencies of the internal rule dependency should be handled as the dependency
* of referenced object.
*/
if (pg_depend->deptype == DEPENDENCY_INTERNAL && pg_depend->classid ==
RewriteRelationId)
{
ObjectAddress ruleAddress = { 0 };
ObjectAddressSet(ruleAddress, RewriteRelationId, pg_depend->objid);
/* Expand results with the noninternal dependencies of it */
List *ruleDependencies = DependencyDefinitionFromPgDepend(ruleAddress);
DependencyDefinition *dependencyDefinition = NULL;
foreach_ptr(dependencyDefinition, ruleDependencies)
{
/* Do not add internal dependencies and relation itself */
if (dependencyDefinition->data.pg_depend.deptype == DEPENDENCY_INTERNAL ||
(dependencyDefinition->data.pg_depend.refclassid ==
RelationRelationId &&
dependencyDefinition->data.pg_depend.refobjid == relationId))
{
continue;
}
nonInternalRuleDependencies = lappend(nonInternalRuleDependencies,
dependencyDefinition);
}
}
}
return nonInternalRuleDependencies;
}
/*
* GetRelationSequenceDependencyList returns the sequence dependency definition
* list for the given relation.

View File

@ -419,6 +419,7 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
case OBJECT_TABLE:
case OBJECT_EXTENSION:
case OBJECT_COLLATION:
case OBJECT_VIEW:
{
check_object_ownership(userId, type, *addr, node, *relation);
break;

View File

@ -624,6 +624,16 @@ extern void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
/* vacuum.c - forward declarations */
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
/* view.c - forward declarations */
extern List * PreprocessViewStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessViewStmt(Node *node, const char *queryString);
extern ObjectAddress ViewStmtObjectAddress(Node *node, bool missing_ok);
extern List * PreprocessDropViewStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern char * DeparseViewStmt(Node *node);
extern char * DeparseDropViewStmt(Node *node);
/* trigger.c - forward declarations */
extern List * GetExplicitTriggerCommandList(Oid relationId);
extern HeapTuple GetTriggerTupleById(Oid triggerId, bool missingOk);

View File

@ -121,6 +121,8 @@ extern void QualifyCreateEnumStmt(Node *stmt);
extern void QualifyAlterTypeSchemaStmt(Node *stmt);
extern void QualifyAlterTypeOwnerStmt(Node *stmt);
extern void QualifyDropViewStmt(Node *node);
extern ObjectAddress GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok);
extern ObjectAddress RenameAttributeStmtObjectAddress(Node *stmt, bool missing_ok);