mx_views_properly
Onder Kalaci 2021-06-29 13:36:01 +02:00
parent c932642e3b
commit a5e09ffa2a
4 changed files with 106 additions and 32 deletions

View File

@ -1120,41 +1120,51 @@ GetViewCreationCommandsOfTable(Oid relationId)
Oid viewOid = InvalidOid;
foreach_oid(viewOid, views)
{
Datum viewDefinitionDatum = DirectFunctionCall1(pg_get_viewdef,
ObjectIdGetDatum(viewOid));
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
StringInfo query = makeStringInfo();
char *viewName = get_rel_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
bool isMatView = get_rel_relkind(viewOid) == RELKIND_MATVIEW;
/* here we need to get the access method of the view to recreate it */
char *accessMethodName = GetAccessMethodForMatViewIfExists(viewOid);
appendStringInfoString(query, "CREATE ");
if (isMatView)
{
appendStringInfoString(query, "MATERIALIZED ");
}
appendStringInfo(query, "VIEW %s ", qualifiedViewName);
if (accessMethodName)
{
appendStringInfo(query, "USING %s ", accessMethodName);
}
appendStringInfo(query, "AS %s", viewDefinition);
commands = lappend(commands, makeTableDDLCommandString(query->data));
commands = lappend(commands, makeTableDDLCommandString(GetViewCreationCommand(
viewOid)));
}
return commands;
}
char *
GetViewCreationCommand(Oid viewOid)
{
Datum viewDefinitionDatum = DirectFunctionCall1(pg_get_viewdef,
ObjectIdGetDatum(viewOid));
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
StringInfo query = makeStringInfo();
char *viewName = get_rel_name(viewOid);
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
bool isMatView = get_rel_relkind(viewOid) == RELKIND_MATVIEW;
/* here we need to get the access method of the view to recreate it */
char *accessMethodName = GetAccessMethodForMatViewIfExists(viewOid);
appendStringInfo(query, "DROP VIEW IF EXISTS %s CASCADE;", qualifiedViewName);
appendStringInfoString(query, "CREATE ");
if (isMatView)
{
appendStringInfoString(query, "MATERIALIZED ");
}
appendStringInfo(query, "VIEW %s ", qualifiedViewName);
if (accessMethodName)
{
appendStringInfo(query, "USING %s ", accessMethodName);
}
appendStringInfo(query, "AS %s", viewDefinition);
return query->data;
}
/*
* ReplaceTable replaces the source table with the target table.
* It moves all the rows of the source table to target table with INSERT SELECT.

View File

@ -69,6 +69,43 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
}
}
List *metadataSyncedDependenciesWithCommands = NIL;
if (target->classId == RelationRelationId)
{
/* don't forget partitioned/foreign tables */
if (get_rel_relkind(target->objectId) == RELKIND_RELATION)
{
List *viewIds = GetDependingViews(target->objectId);
Oid viewOid = InvalidOid;
foreach_oid(viewOid, viewIds)
{
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
dependencies = GetDependenciesForObject(&viewAddress);
foreach_ptr(dependency, dependencies)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
metadataSyncedDependenciesWithCommands = list_concat(
metadataSyncedDependenciesWithCommands, dependencyCommands);
char *viewDefinition = GetViewCreationCommand(viewOid);
metadataSyncedDependenciesWithCommands = lappend(
metadataSyncedDependenciesWithCommands, viewDefinition);
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
metadataSyncedDependenciesWithCommands = lappend(
metadataSyncedDependenciesWithCommands, dependency);
}
}
}
}
}
if (list_length(ddlCommands) <= 0)
{
/* no ddl commands to be executed */
@ -77,6 +114,9 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
/* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands);
metadataSyncedDependenciesWithCommands = list_concat(list_make1(
DISABLE_DDL_PROPAGATION),
metadataSyncedDependenciesWithCommands);
/*
* Make sure that no new nodes are added after this point until the end of the
@ -112,7 +152,6 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
return;
}
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
@ -123,6 +162,20 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
CitusExtensionOwnerName(),
ddlCommands);
}
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
if (workerNode->hasMetadata)
{
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort,
CitusExtensionOwnerName(),
metadataSyncedDependenciesWithCommands);
}
}
}
@ -182,6 +235,13 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{
return NIL;
}
else if (get_rel_relkind(dependency->objectId) == RELKIND_VIEW ||
get_rel_relkind(dependency->objectId) == RELKIND_MATVIEW)
{
char *viewDef = GetViewCreationCommand(dependency->objectId);
return list_make1(viewDef);
}
/* if this relation is not supported, break to the error at the end */
break;

View File

@ -648,7 +648,11 @@ SupportedDependencyByCitus(const ObjectAddress *address)
{
return true;
}
else if (get_rel_relkind(address->objectId) == RELKIND_VIEW ||
get_rel_relkind(address->objectId) == RELKIND_MATVIEW)
{
return true;
}
return false;
}

View File

@ -248,7 +248,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
bool viaDeprecatedAPI);
extern void CreateTruncateTrigger(Oid relationId);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern char * GetViewCreationCommand(Oid viewOid);
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern bool ShouldPropagate(void);