mirror of https://github.com/citusdata/citus.git
Use unique id as a prefix in CIMV creation
Relname itself is not unique across the database hence for internal CIMV related tables we use a unique id prefix instead of just relname so that the same mv in two different schemas don't cause overlap.cimv
parent
d240f071c5
commit
6c9872d182
|
@ -17,6 +17,9 @@
|
|||
#include "distributed/pg_cimv.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/security_utils.h"
|
||||
#include "distributed/sequence_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "executor/spi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
|
@ -64,6 +67,8 @@ typedef struct
|
|||
CitusTableCacheEntry *citusTable;
|
||||
TargetEntry *partitionColumn;
|
||||
bool supportsDelete;
|
||||
char* prefix;
|
||||
int prefixId;
|
||||
} CimvCreate;
|
||||
|
||||
static void CreateCimv(CimvCreate *cimvCreate);
|
||||
|
@ -97,6 +102,8 @@ static Oid PartialAggOid(void);
|
|||
static void AppendQuotedLiteral(StringInfo buf, const char *val);
|
||||
static void AppendStringInfoFunction(StringInfo buf, Oid fnoid);
|
||||
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
|
||||
static char* CIMVTriggerFuncName(int prefixId, const char* relname);
|
||||
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
|
||||
|
||||
bool
|
||||
ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string,
|
||||
|
@ -864,28 +871,33 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
|
|||
cimvCreate->formCimv->jobid = 0;
|
||||
cimvCreate->formCimv->landingtable = InvalidOid;
|
||||
|
||||
namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CITUS_INTERNAL_SCHEMA);
|
||||
namestrcpy(&cimvCreate->formCimv->triggerfnname, stmt->into->rel->relname);
|
||||
|
||||
StringInfo mat = makeStringInfo();
|
||||
appendStringInfo(mat, "%s_cimv_%s", stmt->into->rel->relname,
|
||||
MATERIALIZATION_TABLE_SUFFIX);
|
||||
|
||||
StringInfo rv = makeStringInfo();
|
||||
appendStringInfo(rv, "%s_cimv_%s", stmt->into->rel->relname, REFRESH_VIEW_SUFFIX);
|
||||
|
||||
StringInfo ld = makeStringInfo();
|
||||
appendStringInfo(ld, "%s_cimv_%s", stmt->into->rel->relname, LANDING_TABLE_SUFFIX);
|
||||
|
||||
Query *query = (Query *) stmt->query;
|
||||
RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable);
|
||||
|
||||
cimvCreate->stmt = stmt;
|
||||
cimvCreate->createOptions = createOptions;
|
||||
cimvCreate->baseTableName = makeRangeVar(get_namespace_name(get_rel_namespace(
|
||||
baseRte->relid)),
|
||||
get_rel_name(baseRte->relid), -1);
|
||||
|
||||
cimvCreate->stmt = stmt;
|
||||
cimvCreate->createOptions = createOptions;
|
||||
|
||||
cimvCreate->formCimv->basetable = baseRte->relid;
|
||||
|
||||
cimvCreate->prefixId = UniqueId();
|
||||
cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId);
|
||||
namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CITUS_INTERNAL_SCHEMA);
|
||||
char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname);
|
||||
namestrcpy(&cimvCreate->formCimv->triggerfnname, funcName);
|
||||
StringInfo mat = makeStringInfo();
|
||||
appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX);
|
||||
|
||||
StringInfo rv = makeStringInfo();
|
||||
appendStringInfo(rv, "%s_cimv_%s", cimvCreate->prefix, REFRESH_VIEW_SUFFIX);
|
||||
|
||||
StringInfo ld = makeStringInfo();
|
||||
appendStringInfo(ld, "%s_cimv_%s", cimvCreate->prefix, LANDING_TABLE_SUFFIX);
|
||||
|
||||
cimvCreate->matTableName = makeRangeVar(CITUS_INTERNAL_SCHEMA, mat->data, -1);
|
||||
cimvCreate->userViewName = stmt->into->rel;
|
||||
cimvCreate->refreshViewName = makeRangeVar(CITUS_INTERNAL_SCHEMA, rv->data, -1);
|
||||
|
@ -918,6 +930,24 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
|
|||
return cimvCreate;
|
||||
}
|
||||
|
||||
static char* CIMVTriggerFuncName(int prefixId, const char* relname) {
|
||||
StringInfo funcName = makeStringInfo();
|
||||
appendStringInfo(funcName, "%s_%d",quote_identifier(relname), prefixId);
|
||||
return funcName->data;
|
||||
}
|
||||
|
||||
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId) {
|
||||
|
||||
if (baseTable->schemaname == NULL || baseTable->relname == NULL) {
|
||||
ereport(ERROR, (errmsg("unexpected state: schema name or relname not found.")));
|
||||
}
|
||||
|
||||
StringInfo prefix = makeStringInfo();
|
||||
appendStringInfo(prefix, "%s_%s_%d",quote_identifier(baseTable->schemaname),
|
||||
quote_identifier(baseTable->relname), prefixId);
|
||||
return prefix->data;
|
||||
}
|
||||
|
||||
|
||||
static MatViewCreateOptions *
|
||||
GetMatViewCreateOptions(const CreateTableAsStmt *stmt)
|
||||
|
|
|
@ -6,5 +6,6 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
|||
#include "udfs/citus_total_relation_size/10.0-1.sql"
|
||||
#include "udfs/citus_tables/10.0-1.sql"
|
||||
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
|
||||
#include "udfs/citus_unique_id/10.0-1.sql"
|
||||
|
||||
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
DROP VIEW public.citus_tables;
|
||||
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
|
||||
DROP SEQUENCE citus_unique_id;
|
||||
|
||||
#include "../udfs/citus_total_relation_size/7.0-1.sql"
|
||||
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
CREATE SEQUENCE citus.citus_unique_id
|
||||
MINVALUE 1
|
||||
NO CYCLE;
|
||||
ALTER SEQUENCE citus.citus_unique_id SET SCHEMA pg_catalog;
|
|
@ -0,0 +1,4 @@
|
|||
CREATE SEQUENCE citus.citus_unique_id
|
||||
MINVALUE 1
|
||||
NO CYCLE;
|
||||
ALTER SEQUENCE citus.citus_unique_id SET SCHEMA pg_catalog;
|
|
@ -0,0 +1,24 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "commands/sequence.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "distributed/sequence_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
|
||||
#include "utils/builtins.h"
|
||||
|
||||
|
||||
|
||||
int UniqueId(void) {
|
||||
text *sequenceName = cstring_to_text(CITUS_UNIQUE_ID_SEQUENCE_NAME);
|
||||
Oid sequenceId = ResolveRelationId(sequenceName, false);
|
||||
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||
|
||||
Datum uniqueIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
|
||||
|
||||
int uniqueId = DatumGetInt64(uniqueIdDatum);
|
||||
|
||||
return uniqueId;
|
||||
}
|
|
@ -56,6 +56,7 @@
|
|||
|
||||
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
|
||||
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq"
|
||||
#define CITUS_UNIQUE_ID_SEQUENCE_NAME "citus_unique_id"
|
||||
|
||||
/* Remote call definitions to help with data staging and deletion */
|
||||
#define WORKER_APPLY_SHARD_DDL_COMMAND \
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* sequence_utils.h
|
||||
* sequence related utility functions.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef SEQUENCE_UTILS_H_
|
||||
#define SEQUENCE_UTILS_H_
|
||||
|
||||
extern int UniqueId(void);
|
||||
|
||||
#endif
|
|
@ -354,6 +354,29 @@ GROUP BY a, d_hour WITH NO DATA;
|
|||
|
||||
SELECT * FROM mv;
|
||||
|
||||
-- make sure same mv can be created in different schemas without overlap
|
||||
CREATE SCHEMA another_schema;
|
||||
SET search_path to another_schema;
|
||||
SET citus.shard_count TO 4;
|
||||
|
||||
CREATE TABLE events (a int, b int, c double precision, d timestamp, e bigint);
|
||||
|
||||
CREATE MATERIALIZED VIEW mv WITH (citus.cimv) AS
|
||||
SELECT a,
|
||||
date_trunc('hour', d) AS d_hour,
|
||||
min(b) AS min_b,
|
||||
max(b) AS max_b,
|
||||
avg(b) AS avg_b,
|
||||
min(c) AS min_c,
|
||||
max(c) AS max_c,
|
||||
avg(c) AS avg_c,
|
||||
min(e) AS min_e,
|
||||
max(e) AS max_e,
|
||||
avg(e) AS avg_e
|
||||
FROM events
|
||||
WHERE b > 10
|
||||
GROUP BY a, d_hour;
|
||||
|
||||
DROP MATERIALIZED VIEW mv;
|
||||
|
||||
SET client_min_messages TO WARNING; -- suppress cascade messages
|
||||
|
|
Loading…
Reference in New Issue