From ef724edf3151b2eb31eedc25d574b017b051aa76 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Thu, 19 Aug 2021 13:18:48 +0300 Subject: [PATCH] Introduce create_timeseries_table with related checks --- src/backend/distributed/Makefile | 2 + .../distributed/sql/citus--10.1-1--10.2-1.sql | 3 + .../timeseries/create_timeseries_table.c | 280 ++++++++++++++++++ .../sql/timeseries--10.1-1--10.2-1.sql | 21 ++ .../udfs/create_missing_partitions/10.2-1.sql | 86 ++++++ .../udfs/create_missing_partitions/latest.sql | 88 ++++++ .../udfs/create_timeseries_table/10.2-1.sql | 23 ++ .../udfs/create_timeseries_table/latest.sql | 19 ++ 8 files changed, 522 insertions(+) create mode 100644 src/backend/timeseries/create_timeseries_table.c create mode 100644 src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql create mode 100644 src/backend/timeseries/sql/udfs/create_missing_partitions/10.2-1.sql create mode 100644 src/backend/timeseries/sql/udfs/create_missing_partitions/latest.sql create mode 100644 src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql create mode 100644 src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index a2ffcc142..d30912d9c 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -21,6 +21,8 @@ DATA_built = $(generated_sql_files) SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib test transaction utils worker # columnar modules SUBDIRS += ../columnar +# timeseries modules +SUBDIRS += ../timeseries # enterprise modules SUBDIRS += diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index c2b779d9f..546494f3a 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -16,3 +16,6 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi #include "udfs/citus_internal_update_placement_metadata/10.2-1.sql"; #include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql"; #include "udfs/citus_internal_update_relation_colocation/10.2-1.sql"; +#include "../../timeseries/sql/timeseries--10.1-1--10.2-1.sql" +#include "../../timeseries/sql/udfs/create_timeseries_table/10.2-1.sql" +#include "../../timeseries/sql/udfs/create_missing_partitions/10.2-1.sql" diff --git a/src/backend/timeseries/create_timeseries_table.c b/src/backend/timeseries/create_timeseries_table.c new file mode 100644 index 000000000..a5be2f324 --- /dev/null +++ b/src/backend/timeseries/create_timeseries_table.c @@ -0,0 +1,280 @@ +/*------------------------------------------------------------------------- + * + * create_timeseries_table.c + * Routines related to the creation of timeseries relations. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "fmgr.h" + +#include "access/genam.h" +#include "catalog/pg_am.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_partitioned_table.h" +#include "catalog/pg_type.h" +#include "commands/extension.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/resource_lock.h" +#include "executor/spi.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/partcache.h" +#include "utils/syscache.h" +#include "utils/timestamp.h" + +#define Natts_citus_timeseries_tables 6 + +#define INTERVAL_TO_SEC(ivp) \ + (((double) (ivp)->time) / ((double) USECS_PER_SEC) + \ + (ivp)->day * (24.0 * SECS_PER_HOUR) + \ + (ivp)->month * (30.0 * SECS_PER_DAY)) + +PG_FUNCTION_INFO_V1(create_timeseries_table); + +static Oid TimeseriesNamespaceId(void); +static Oid CitusTimeseriesTablesRelationId(); +static void InitiateTimeseriesTablePartitions(Oid relationId); +static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, + int postMakePartitionCount, Interval *compressionThresholdInterval, + Interval *retentionThresholdInterval); +static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, + Interval *compresstionThresholdInterval, + Interval *retentionThresholdInterval); +static bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval); +static bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold); + +/* + * create_timeseries_table gets a table name, partition interval + * optional pre and post make partition counts, compression and retention threshold + * then it creates a timeseries table. + */ +Datum +create_timeseries_table(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) + { + PG_RETURN_VOID(); + } + + Oid relationId = PG_GETARG_OID(0); + Interval *partitionInterval = PG_GETARG_INTERVAL_P(1); + int preMakePartitionCount = PG_GETARG_INT32(2); + int postMakePartitionCount = PG_GETARG_INT32(3); + Interval *compressionThresholdInterval = NULL; + Interval *retentionThresholdInterval = NULL; + + if (!PG_ARGISNULL(4)) + { + compressionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(4)); + } + + if (!PG_ARGISNULL(5)) + { + retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5)); + } + + ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, compressionThresholdInterval, retentionThresholdInterval); + + InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount, + postMakePartitionCount, compressionThresholdInterval, retentionThresholdInterval); + + InitiateTimeseriesTablePartitions(relationId); + + PG_RETURN_VOID(); +} + +static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) +{ + Relation pgPartitionedTableRelation; + PartitionKey partitionKey; + + if(!TableEmpty(relationId)) + { + ereport(ERROR, (errmsg("non-empty tables can not be converted to timeseries table"))); + } + + if(!PartitionedTable(relationId)) + { + ereport(ERROR, (errmsg("non-partitioned tables can not be converted to timeseries table"))); + } + + pgPartitionedTableRelation = table_open(relationId, AccessShareLock); + partitionKey = RelationGetPartitionKey(pgPartitionedTableRelation); + + if (partitionKey->strategy != PARTITION_STRATEGY_RANGE) + { + ereport(ERROR, (errmsg("table must be partitioned by range to convert it to timeseries table"))); + } + + if (partitionKey->partnatts != 1) + { + ereport(ERROR, (errmsg("table must be partitioned by single column to convert it to timeseries table"))); + } + + if (!CheckIntervalAlignmentWithThresholds(partitionInterval, compressionThresholdInterval, retentionThresholdInterval)) + { + ereport(ERROR, (errmsg("must be retention threshold > compression threshold > partition interval"))); + } + + if (!CheckIntervalAlignnmentWithPartitionKey(partitionKey, partitionInterval)) + { + ereport(ERROR, (errmsg("partition interval for partition on date must be multiple days"))); + } + + if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, compressionThresholdInterval)) + { + ereport(ERROR, (errmsg("compression threshold interval for table partitioned on date must be multiple days"))); + } + + if (retentionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, retentionThresholdInterval)) + { + ereport(ERROR, (errmsg("retention threshold interval for table partitioned on date must be multiple days"))); + } + + table_close(pgPartitionedTableRelation, NoLock); +} + +/* + * Compare partition interval, compression threshold and retenetion threshold. Note that + * compression threshold or retention threshold can be null. + */ +static bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold) +{ + bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : INTERVAL_TO_SEC(compressionThreshold) > INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanCompression = compressionThreshold == NULL || retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(compressionThreshold); + + return compressionGreaterThanInterval && retentionGreaterThanInterval && retentionGreaterThanCompression; +} + +/* + * Check whether the given partition interval aligns with the partition column of the table. + */ +static bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval) +{ + Oid partTypeId; + HeapTuple typeTuple; + Form_pg_type typeForm; + + partTypeId = partitionKey->parttypid[0]; + typeTuple = SearchSysCache1(TYPEOID, partTypeId); + typeForm = (Form_pg_type) GETSTRUCT(typeTuple); + ReleaseSysCache(typeTuple); + + if(strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0) + { + if (partitionInterval->time == 0) + { + return true; + } + } + else if(strncmp(typeForm->typname.data, "timestamp", NAMEDATALEN) == 0 || + strncmp(typeForm->typname.data, "timestamptz", NAMEDATALEN) == 0) + { + return true; + } + + return false; +} + +/* + * Create the initial pre and post make partitions for the given relation id + * by getting the related information from citus_timeseries_tables and utilizing + * create_missing_partitions + */ +static void +InitiateTimeseriesTablePartitions(Oid relationId) +{ + bool readOnly = false; + StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo(); + appendStringInfo(initiateTimeseriesPartitionsCommand, "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", relationId); + + int spiConnectionResult = SPI_connect(); + if (spiConnectionResult != SPI_OK_CONNECT) + { + ereport(WARNING, (errmsg("could not connect to SPI manager to initiate timeseries table partitions"))); + SPI_finish(); + } + + SPI_execute(initiateTimeseriesPartitionsCommand->data, readOnly, 0); + SPI_finish(); +} + +/* + * Add tuples for the given table to the citus_timeseries_tables using given params + */ +static void +InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, int postMakePartitionCount, + Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) +{ + Datum newValues[Natts_citus_timeseries_tables]; + bool isNulls[Natts_citus_timeseries_tables]; + + Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), RowExclusiveLock); + + memset(newValues, 0, sizeof(newValues)); + memset(isNulls, false, sizeof(isNulls)); + + newValues[0] = ObjectIdGetDatum(relationId); + newValues[1] = IntervalPGetDatum(partitionInterval); + newValues[2] = Int32GetDatum(preMakePartitionCount); + newValues[3] = Int32GetDatum(postMakePartitionCount); + + if (compressionThresholdInterval != NULL) + { + newValues[4] = IntervalPGetDatum(compressionThresholdInterval); + } + else + { + isNulls[4] = true; + } + + if (retentionThresholdInterval != NULL) + { + newValues[5] = IntervalPGetDatum(retentionThresholdInterval); + } + else + { + isNulls[5] = true; + } + + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), newValues, isNulls); + CatalogTupleInsert(citusTimeseriesTable, newTuple); + + table_close(citusTimeseriesTable, NoLock); +} + +/* + * Get the relation id for citus_timeseries_tables metadata table + */ +static Oid +CitusTimeseriesTablesRelationId() +{ + Oid relationId = get_relname_relid("citus_timeseries_tables", TimeseriesNamespaceId()); + if (relationId == InvalidOid) + { + ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables, called too early?"))); + } + + return relationId; +} + +/* + * TimeseriesNamespaceId returns namespace id of the schema we store timeseries + * related metadata tables. + */ +static Oid +TimeseriesNamespaceId(void) +{ + return get_namespace_oid("citus_timeseries", false); +} \ No newline at end of file diff --git a/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql new file mode 100644 index 000000000..0a98e00c2 --- /dev/null +++ b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql @@ -0,0 +1,21 @@ +/* timeseries--10.1-1--10.2-1.sql */ + +CREATE SCHEMA citus_timeseries; +SET search_path TO citus_timeseries; + +CREATE TABLE citus_timeseries_tables ( + logicalrelid regclass NOT NULL PRIMARY KEY, + partitioninterval INTERVAL NOT NULL, + premakeintervalcount INT NOT NULL, + postmakeintervalcount INT NOT NULL, + compressionthreshold INTERVAL, + retentionthreshold INTERVAL +); + +COMMENT ON TABLE citus_timeseries_tables IS 'Keeps interval and threshold informations for timeseries tables'; + +-- grant read access for timeseries metadata tables to unprivileged user +GRANT USAGE ON SCHEMA citus_timeseries TO PUBLIC; +GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC; + +RESET search_path; diff --git a/src/backend/timeseries/sql/udfs/create_missing_partitions/10.2-1.sql b/src/backend/timeseries/sql/udfs/create_missing_partitions/10.2-1.sql new file mode 100644 index 000000000..b5ca3d9b7 --- /dev/null +++ b/src/backend/timeseries/sql/udfs/create_missing_partitions/10.2-1.sql @@ -0,0 +1,86 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_missing_partitions( + table_name regclass, + to_date timestamptz, + start_from timestamptz DEFAULT NULL) +returns boolean +LANGUAGE plpgsql +AS $$ +DECLARE + table_partition_interval INTERVAL; + current_range_from_value timestamptz; + current_range_to_value timestamptz; + current_partition_name text; + current_partition_count int; +BEGIN + /* + * TODO: Check whether the table is timeseries table with INTERVAL partition range + * and timestamptz partition column + */ + + /* + * Then check if start_from is given. If it is, create the partition for that time + * and let remaining of the function fill the gap from start_from to to_date. + * + * TODO: That part is implemented by assuming that there is no partition exist for + * the given table, in other words that function will be called via create_timeseries_table + * only. + * + * TODO: Handle date trunc according to given interval of the timeseries table + */ + + SELECT partitioninterval + INTO table_partition_interval + FROM citus_timeseries.citus_timeseries_tables + WHERE logicalrelid = table_name; + + IF start_from IS NOT NULL THEN + RAISE NOTICE 'IN START FROM'; + current_partition_count := 0; + current_range_from_value := start_from; + current_range_to_value := start_from + table_partition_interval; + current_partition_name := table_name::text || '_' || current_partition_count::text; + + EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (''%I'') TO (''%I'')', current_partition_name, table_name::text, current_range_from_value::text, current_range_to_value::text); + END IF; + + /* + * At this point, it is assumed that initial partition of the timeseries table + * exists. Remaining partitions till to_date will be created if any partition + * missing + */ + SELECT from_value::timestamptz, to_value::timestamptz + INTO current_range_from_value, current_range_to_value + FROM pg_catalog.time_partitions + WHERE parent_table = table_name + ORDER BY from_value::timestamptz; + + SELECT count(*) + INTO current_partition_count + FROM pg_catalog.time_partitions + WHERE parent_table = table_name; + + RAISE NOTICE 'current_range_from_value %', current_range_from_value; + + WHILE current_range_from_value < to_date LOOP + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; + current_partition_name := table_name::text || '_' || current_partition_count::text; + + -- TODO: Create that using attach partition, which can be implemented with another UDF to make sure that it gets light locks. + BEGIN + EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (''%I'') TO (''%I'')', current_partition_name, table_name::text, current_range_from_value::text, current_range_to_value::text); + EXCEPTION WHEN OTHERS THEN + raise notice 'Partition has already been created for the range from % to %', current_range_from_value::text, current_range_to_value::text; + END; + + current_partition_count := current_partition_count + 1; + END LOOP; + + return true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.create_missing_partitions( + table_name regclass, + to_date timestamptz, + start_from timestamptz) +IS 'create missing partitions for the given timeseries table'; \ No newline at end of file diff --git a/src/backend/timeseries/sql/udfs/create_missing_partitions/latest.sql b/src/backend/timeseries/sql/udfs/create_missing_partitions/latest.sql new file mode 100644 index 000000000..b83a4a24f --- /dev/null +++ b/src/backend/timeseries/sql/udfs/create_missing_partitions/latest.sql @@ -0,0 +1,88 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_missing_partitions( + table_name regclass, + to_date timestamptz, + start_from timestamptz DEFAULT NULL) +returns boolean +LANGUAGE plpgsql +AS $$ +DECLARE + table_partition_interval INTERVAL; + current_range_from_value timestamptz; + current_range_to_value timestamptz; + current_partition_name text; + current_partition_count int; +BEGIN + /* + * TODO: Check whether the table is timeseries table with INTERVAL partition range + * and timestamptz partition column + */ + + /* + * Then check if start_from is given. If it is, create the partition for that time + * and let remaining of the function fill the gap from start_from to to_date. + * + * TODO: That part is implemented by assuming that there is no partition exist for + * the given table, in other words that function will be called via create_timeseries_table + * only. + * + * TODO: Handle date trunc according to given interval of the timeseries table + */ + + SELECT partitioninterval + INTO table_partition_interval + FROM citus_timeseries.citus_timeseries_tables + WHERE logicalrelid = table_name; + + IF start_from IS NOT NULL THEN + RAISE NOTICE 'IN START FROM'; + current_partition_count := 0; + current_range_from_value := start_from; + current_range_to_value := start_from + table_partition_interval; + current_partition_name := table_name::text || '_' || current_partition_count::text; + + EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (''%I'') TO (''%I'')', current_partition_name, table_name::text, current_range_from_value::text, current_range_to_value::text); + END IF; + + /* + * At this point, it is assumed that initial partition of the timeseries table + * exists. Remaining partitions till to_date will be created if any partition + * missing + */ + SELECT from_value::timestamptz, to_value::timestamptz + INTO current_range_from_value, current_range_to_value + FROM pg_catalog.time_partitions + WHERE parent_table = table_name + + ORDER BY from_value::timestamptz; + + SELECT count(*) + INTO current_partition_count + FROM pg_catalog.time_partitions + WHERE parent_table = table_name; + + RAISE NOTICE 'current_range_from_value %', current_range_from_value; + + WHILE current_range_from_value < to_date LOOP + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; + current_partition_name := table_name::text || '_' || current_partition_count::text; + + -- TODO: Create that using attach partition, which can be implemented with another UDF to make sure that it gets light locks. + -- TODO: Add exception handling or partition check + BEGIN + EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (''%I'') TO (''%I'')', current_partition_name, table_name::text, current_range_from_value::text, current_range_to_value::text); + EXCEPTION WHEN OTHERS THEN + raise notice 'oops %', sqlstate; + END; + + current_partition_count := current_partition_count + 1; + END LOOP; + + return true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.create_missing_partitions( + table_name regclass, + to_date timestamptz, + start_from timestamptz) +IS 'create missing partitions for the given timeseries table'; \ No newline at end of file diff --git a/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql b/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql new file mode 100644 index 000000000..b2b0d3d4c --- /dev/null +++ b/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql @@ -0,0 +1,23 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( + table_name regclass, + partition_interval INTERVAL, + premake_interval_count int DEFAULT 7, + postmake_interval_count int DEFAULT 7, + compression_threshold INTERVAL DEFAULT NULL, + retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'create_timeseries_table'; + +COMMENT ON FUNCTION pg_catalog.create_timeseries_table( + table_name regclass, + partition_interval INTERVAL, + premake_interval_count int, + postmake_interval_count int, + compression_threshold INTERVAL, + retention_threshold INTERVAL) +IS 'creates a citus timeseries table which will be autopartitioned'; + +-- Add UDF to undo that! - keep in mind but plan depending on the resources +-- Add note about alter won't change existing tables +-- one function instead of multiple for compression and expiration \ No newline at end of file diff --git a/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql new file mode 100644 index 000000000..35c7ca55f --- /dev/null +++ b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql @@ -0,0 +1,19 @@ +CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( + table_name regclass, + partition_interval INTERVAL, + premake_interval_count int DEFAULT 7, + postmake_interval_count int DEFAULT 7, + compression_threshold INTERVAL DEFAULT NULL, + retention_threshold INTERVAL DEFAULT NULL) + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'create_timeseries_table'; + +COMMENT ON FUNCTION pg_catalog.create_timeseries_table( + table_name regclass, + partition_interval INTERVAL, + premake_interval_count int, + postmake_interval_count int, + compression_threshold INTERVAL, + retention_threshold INTERVAL) +IS 'creates a citus timeseries table which will be autopartitioned';