diff --git a/src/backend/distributed/cimv/create.c b/src/backend/distributed/cimv/create.c index 6ec008aa7..a7c7b7971 100644 --- a/src/backend/distributed/cimv/create.c +++ b/src/backend/distributed/cimv/create.c @@ -17,6 +17,9 @@ #include "distributed/pg_cimv.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/security_utils.h" +#include "distributed/sequence_utils.h" +#include "distributed/coordinator_protocol.h" #include "executor/spi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -64,6 +67,8 @@ typedef struct CitusTableCacheEntry *citusTable; TargetEntry *partitionColumn; bool supportsDelete; + char* prefix; + int prefixId; } CimvCreate; static void CreateCimv(CimvCreate *cimvCreate); @@ -97,6 +102,8 @@ static Oid PartialAggOid(void); static void AppendQuotedLiteral(StringInfo buf, const char *val); static void AppendStringInfoFunction(StringInfo buf, Oid fnoid); static Oid AggregateFunctionOid(const char *functionName, Oid inputType); +static char* CIMVTriggerFuncName(int prefixId, const char* relname); +static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId); bool ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string, @@ -864,28 +871,33 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create cimvCreate->formCimv->jobid = 0; cimvCreate->formCimv->landingtable = InvalidOid; - namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CITUS_INTERNAL_SCHEMA); - namestrcpy(&cimvCreate->formCimv->triggerfnname, stmt->into->rel->relname); - - StringInfo mat = makeStringInfo(); - appendStringInfo(mat, "%s_cimv_%s", stmt->into->rel->relname, - MATERIALIZATION_TABLE_SUFFIX); - - StringInfo rv = makeStringInfo(); - appendStringInfo(rv, "%s_cimv_%s", stmt->into->rel->relname, REFRESH_VIEW_SUFFIX); - - StringInfo ld = makeStringInfo(); - appendStringInfo(ld, "%s_cimv_%s", stmt->into->rel->relname, LANDING_TABLE_SUFFIX); Query *query = (Query *) stmt->query; RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable); + cimvCreate->baseTableName = makeRangeVar(get_namespace_name(get_rel_namespace( + baseRte->relid)), + get_rel_name(baseRte->relid), -1); + cimvCreate->stmt = stmt; cimvCreate->createOptions = createOptions; - cimvCreate->baseTableName = makeRangeVar(get_namespace_name(get_rel_namespace( - baseRte->relid)), - get_rel_name(baseRte->relid), -1); + cimvCreate->formCimv->basetable = baseRte->relid; + + cimvCreate->prefixId = UniqueId(); + cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId); + namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CITUS_INTERNAL_SCHEMA); + char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname); + namestrcpy(&cimvCreate->formCimv->triggerfnname, funcName); + StringInfo mat = makeStringInfo(); + appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX); + + StringInfo rv = makeStringInfo(); + appendStringInfo(rv, "%s_cimv_%s", cimvCreate->prefix, REFRESH_VIEW_SUFFIX); + + StringInfo ld = makeStringInfo(); + appendStringInfo(ld, "%s_cimv_%s", cimvCreate->prefix, LANDING_TABLE_SUFFIX); + cimvCreate->matTableName = makeRangeVar(CITUS_INTERNAL_SCHEMA, mat->data, -1); cimvCreate->userViewName = stmt->into->rel; cimvCreate->refreshViewName = makeRangeVar(CITUS_INTERNAL_SCHEMA, rv->data, -1); @@ -918,6 +930,24 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create return cimvCreate; } +static char* CIMVTriggerFuncName(int prefixId, const char* relname) { + StringInfo funcName = makeStringInfo(); + appendStringInfo(funcName, "%s_%d",quote_identifier(relname), prefixId); + return funcName->data; +} + +static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId) { + + if (baseTable->schemaname == NULL || baseTable->relname == NULL) { + ereport(ERROR, (errmsg("unexpected state: schema name or relname not found."))); + } + + StringInfo prefix = makeStringInfo(); + appendStringInfo(prefix, "%s_%s_%d",quote_identifier(baseTable->schemaname), + quote_identifier(baseTable->relname), prefixId); + return prefix->data; +} + static MatViewCreateOptions * GetMatViewCreateOptions(const CreateTableAsStmt *stmt) diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index 41cfb951f..8c47387ab 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -6,5 +6,6 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); #include "udfs/citus_total_relation_size/10.0-1.sql" #include "udfs/citus_tables/10.0-1.sql" #include "udfs/citus_finish_pg_upgrade/10.0-1.sql" +#include "udfs/citus_unique_id/10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql index 8b028c280..917139d31 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql @@ -7,6 +7,7 @@ DROP VIEW public.citus_tables; DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean); +DROP SEQUENCE citus_unique_id; #include "../udfs/citus_total_relation_size/7.0-1.sql" #include "../udfs/upgrade_to_reference_table/8.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_unique_id/10.0-1.sql b/src/backend/distributed/sql/udfs/citus_unique_id/10.0-1.sql new file mode 100644 index 000000000..4990ddedc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_unique_id/10.0-1.sql @@ -0,0 +1,4 @@ +CREATE SEQUENCE citus.citus_unique_id + MINVALUE 1 + NO CYCLE; +ALTER SEQUENCE citus.citus_unique_id SET SCHEMA pg_catalog; \ No newline at end of file diff --git a/src/backend/distributed/sql/udfs/citus_unique_id/latest.sql b/src/backend/distributed/sql/udfs/citus_unique_id/latest.sql new file mode 100644 index 000000000..4990ddedc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_unique_id/latest.sql @@ -0,0 +1,4 @@ +CREATE SEQUENCE citus.citus_unique_id + MINVALUE 1 + NO CYCLE; +ALTER SEQUENCE citus.citus_unique_id SET SCHEMA pg_catalog; \ No newline at end of file diff --git a/src/backend/distributed/utils/sequence_utils.c b/src/backend/distributed/utils/sequence_utils.c new file mode 100644 index 000000000..bdaaf7844 --- /dev/null +++ b/src/backend/distributed/utils/sequence_utils.c @@ -0,0 +1,24 @@ + +#include "postgres.h" + +#include "commands/sequence.h" +#include "fmgr.h" + +#include "distributed/sequence_utils.h" +#include "distributed/coordinator_protocol.h" + +#include "utils/builtins.h" + + + +int UniqueId(void) { + text *sequenceName = cstring_to_text(CITUS_UNIQUE_ID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName, false); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + + Datum uniqueIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + int uniqueId = DatumGetInt64(uniqueIdDatum); + + return uniqueId; +} \ No newline at end of file diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index f871db5b2..d67fc9bd4 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -56,6 +56,7 @@ #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" #define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq" +#define CITUS_UNIQUE_ID_SEQUENCE_NAME "citus_unique_id" /* Remote call definitions to help with data staging and deletion */ #define WORKER_APPLY_SHARD_DDL_COMMAND \ diff --git a/src/include/distributed/sequence_utils.h b/src/include/distributed/sequence_utils.h new file mode 100644 index 000000000..23e6521bc --- /dev/null +++ b/src/include/distributed/sequence_utils.h @@ -0,0 +1,16 @@ +/*------------------------------------------------------------------------- + * + * sequence_utils.h + * sequence related utility functions. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SEQUENCE_UTILS_H_ +#define SEQUENCE_UTILS_H_ + +extern int UniqueId(void); + +#endif diff --git a/src/test/regress/sql/cimv.sql b/src/test/regress/sql/cimv.sql index dc4d65496..a79c45951 100644 --- a/src/test/regress/sql/cimv.sql +++ b/src/test/regress/sql/cimv.sql @@ -354,6 +354,29 @@ GROUP BY a, d_hour WITH NO DATA; SELECT * FROM mv; +-- make sure same mv can be created in different schemas without overlap +CREATE SCHEMA another_schema; +SET search_path to another_schema; +SET citus.shard_count TO 4; + +CREATE TABLE events (a int, b int, c double precision, d timestamp, e bigint); + +CREATE MATERIALIZED VIEW mv WITH (citus.cimv) AS +SELECT a, + date_trunc('hour', d) AS d_hour, + min(b) AS min_b, + max(b) AS max_b, + avg(b) AS avg_b, + min(c) AS min_c, + max(c) AS max_c, + avg(c) AS avg_c, + min(e) AS min_e, + max(e) AS max_e, + avg(e) AS avg_e +FROM events +WHERE b > 10 +GROUP BY a, d_hour; + DROP MATERIALIZED VIEW mv; SET client_min_messages TO WARNING; -- suppress cascade messages