mirror of https://github.com/citusdata/citus.git
wip, failing
parent
c932642e3b
commit
7df6a42a70
|
@ -520,6 +520,29 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
}
|
||||
|
||||
CreateTableMetadataOnWorkers(relationId);
|
||||
|
||||
/* TODO: should probably be in CreateTableMetadataOnWorkers() */
|
||||
if (ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
/*
|
||||
* Ensure that the views are also propagated to the metadata workers
|
||||
*/
|
||||
List *viewList = GetDependingViews(relationId);
|
||||
PropagateDependenciesOfViewList(viewList);
|
||||
|
||||
/* prevent recursive propagation */
|
||||
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||
|
||||
/* send the commands one by one */
|
||||
Oid viewId;
|
||||
List *viewCommandList = NIL;
|
||||
foreach_oid(viewId, viewList)
|
||||
{
|
||||
char *viewDef = GetViewCreationCommand(viewId);
|
||||
viewCommandList = lappend(viewCommandList, viewDef);
|
||||
SendCommandToWorkersWithMetadata(viewDef);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -565,6 +588,43 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
}
|
||||
|
||||
|
||||
char *
|
||||
GetViewCreationCommand(Oid viewOid)
|
||||
{
|
||||
Datum viewDefinitionDatum = DirectFunctionCall1(pg_get_viewdef,
|
||||
ObjectIdGetDatum(viewOid));
|
||||
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
|
||||
StringInfo query = makeStringInfo();
|
||||
char *viewName = get_rel_name(viewOid);
|
||||
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
|
||||
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
|
||||
bool isMatView = get_rel_relkind(viewOid) == RELKIND_MATVIEW;
|
||||
|
||||
/* here we need to get the access method of the view to recreate it */
|
||||
/*char *accessMethodName = GetAccessMethodForMatViewIfExists(viewOid); */
|
||||
|
||||
/*appendStringInfo(query, "DROP VIEW IF EXISTS %s CASCADE;", qualifiedViewName); */
|
||||
|
||||
appendStringInfoString(query, "CREATE ");
|
||||
|
||||
if (isMatView)
|
||||
{
|
||||
appendStringInfoString(query, "MATERIALIZED ");
|
||||
}
|
||||
|
||||
appendStringInfo(query, "VIEW %s ", qualifiedViewName);
|
||||
|
||||
/* if (accessMethodName) */
|
||||
{
|
||||
/* appendStringInfo(query, "USING %s ", accessMethodName); */
|
||||
}
|
||||
|
||||
appendStringInfo(query, "AS %s", viewDefinition);
|
||||
|
||||
return query->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureSequenceTypeSupported ensures that the type of the column that uses
|
||||
* a sequence on its DEFAULT is consistent with previous uses (if any) of the
|
||||
|
@ -641,6 +701,28 @@ AlterSequenceType(Oid seqOid, Oid typeOid)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
PropagateDependenciesOfViewList(List *viewList)
|
||||
{
|
||||
Oid viewOid = InvalidOid;
|
||||
foreach_oid(viewOid, viewList)
|
||||
{
|
||||
PropagateDependenciesOfView(viewOid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
PropagateDependenciesOfView(Oid viewId)
|
||||
{
|
||||
/* get sequence address */
|
||||
ObjectAddress viewAddress = { 0 };
|
||||
ObjectAddressSet(viewAddress, RelationRelationId, viewId);
|
||||
EnsureDependenciesExistOnAllNodes(&viewAddress);
|
||||
MarkObjectDistributed(&viewAddress);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkSequenceListDistributedAndPropagateDependencies ensures dependencies
|
||||
* for the given sequence list exist on all nodes and marks the sequences
|
||||
|
|
|
@ -602,6 +602,38 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
errhint("Connect to worker nodes directly to manually "
|
||||
"rename the role")));
|
||||
}
|
||||
|
||||
|
||||
if (IsA(parsetree, ViewStmt))
|
||||
{
|
||||
CommandCounterIncrement();
|
||||
elog(INFO, "create view");
|
||||
ViewStmt *v = (ViewStmt *)parsetree;
|
||||
|
||||
Oid viewOid = RangeVarGetRelid(v->view, NoLock, false);
|
||||
elog(INFO, "viewOid: %d", viewOid);
|
||||
/* TODO: check the view depends on a distributed table that we should sync */
|
||||
if (ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
/*
|
||||
* Ensure that the views are also propagated to the metadata workers
|
||||
*/
|
||||
PropagateDependenciesOfViewList(list_make1_oid(viewOid));
|
||||
|
||||
/* prevent recursive propagation */
|
||||
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||
|
||||
/* send the commands one by one */
|
||||
{
|
||||
char *viewDef = GetViewCreationCommand(viewOid);
|
||||
elog(INFO, "viewDef: %s", viewDef);
|
||||
|
||||
SendCommandToWorkersWithMetadata(viewDef);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (IsA(parsetree, CreateStmt))
|
||||
|
|
|
@ -321,6 +321,7 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
}
|
||||
}
|
||||
|
||||
#include "distributed/metadata/dependency.h"
|
||||
|
||||
/*
|
||||
* MetadataCreateCommands returns list of queries that are
|
||||
|
@ -481,6 +482,35 @@ MetadataCreateCommands(void)
|
|||
shardCreateCommandList);
|
||||
}
|
||||
|
||||
|
||||
/* after all tables are created, create the metadata */
|
||||
foreach_ptr(cacheEntry, propagatedTableList)
|
||||
{
|
||||
/* TODO: should probably be in CreateTableMetadataOnWorkers() */
|
||||
|
||||
/*
|
||||
* Ensure that the views are also propagated to the metadata workers
|
||||
*/
|
||||
List *viewList = GetDependingViews(cacheEntry->relationId);
|
||||
PropagateDependenciesOfViewList(viewList);
|
||||
|
||||
/* prevent recursive propagation */
|
||||
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||
|
||||
/* send the commands one by one */
|
||||
Oid viewId;
|
||||
List *viewCommandList = NIL;
|
||||
foreach_oid(viewId, viewList)
|
||||
{
|
||||
char *viewDef = GetViewCreationCommand(viewId);
|
||||
viewCommandList = lappend(viewCommandList, viewDef);
|
||||
}
|
||||
|
||||
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
viewCommandList);
|
||||
}
|
||||
|
||||
return metadataSnapshotCommandList;
|
||||
}
|
||||
|
||||
|
|
|
@ -246,6 +246,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
|
|||
char distributionMethod, int shardCount,
|
||||
bool shardCountIsStrict, char *colocateWithTableName,
|
||||
bool viaDeprecatedAPI);
|
||||
extern char * GetViewCreationCommand(Oid viewOid);
|
||||
extern void CreateTruncateTrigger(Oid relationId);
|
||||
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
||||
|
||||
|
@ -297,4 +298,7 @@ extern void MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid);
|
|||
extern void EnsureDistributedSequencesHaveOneType(Oid relationId,
|
||||
List *dependentSequenceList,
|
||||
List *attnumList);
|
||||
|
||||
void PropagateDependenciesOfView(Oid viewId);
|
||||
void PropagateDependenciesOfViewList(List *viewList);
|
||||
#endif /* METADATA_UTILITY_H */
|
||||
|
|
Loading…
Reference in New Issue