diff --git a/src/backend/timeseries/create_timeseries_table.c b/src/backend/timeseries/create_timeseries_table.c index a5be2f324..f0183209f 100644 --- a/src/backend/timeseries/create_timeseries_table.c +++ b/src/backend/timeseries/create_timeseries_table.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * create_timeseries_table.c - * Routines related to the creation of timeseries relations. + * Routines related to the creation of timeseries tables. * * Copyright (c) Citus Data, Inc. * @@ -18,10 +18,6 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_type.h" #include "commands/extension.h" -#include "distributed/listutils.h" -#include "distributed/metadata_cache.h" -#include "distributed/multi_partitioning_utils.h" -#include "distributed/resource_lock.h" #include "executor/spi.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -30,17 +26,16 @@ #include "utils/syscache.h" #include "utils/timestamp.h" -#define Natts_citus_timeseries_tables 6 +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/resource_lock.h" +#include "timeseries/timeseries_utils.h" -#define INTERVAL_TO_SEC(ivp) \ - (((double) (ivp)->time) / ((double) USECS_PER_SEC) + \ - (ivp)->day * (24.0 * SECS_PER_HOUR) + \ - (ivp)->month * (30.0 * SECS_PER_DAY)) +#define Natts_citus_timeseries_tables 6 PG_FUNCTION_INFO_V1(create_timeseries_table); -static Oid TimeseriesNamespaceId(void); -static Oid CitusTimeseriesTablesRelationId(); static void InitiateTimeseriesTablePartitions(Oid relationId); static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, int postMakePartitionCount, Interval *compressionThresholdInterval, @@ -48,8 +43,6 @@ static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionI static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, Interval *compresstionThresholdInterval, Interval *retentionThresholdInterval); -static bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval); -static bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold); /* * create_timeseries_table gets a table name, partition interval @@ -93,6 +86,10 @@ create_timeseries_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * Check whether the given table and intervals are suitable to convert a table to timeseries table, + * if not error out. + */ static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) { Relation pgPartitionedTableRelation; @@ -123,12 +120,12 @@ static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval if (!CheckIntervalAlignmentWithThresholds(partitionInterval, compressionThresholdInterval, retentionThresholdInterval)) { - ereport(ERROR, (errmsg("must be retention threshold > compression threshold > partition interval"))); + ereport(ERROR, (errmsg("retention threshold must be greater than compression threshold and compresstion threshold must be greater than partition interval"))); } if (!CheckIntervalAlignnmentWithPartitionKey(partitionKey, partitionInterval)) { - ereport(ERROR, (errmsg("partition interval for partition on date must be multiple days"))); + ereport(ERROR, (errmsg("partition interval for table partitioned on date must be multiple days"))); } if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, compressionThresholdInterval)) @@ -144,49 +141,6 @@ static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval table_close(pgPartitionedTableRelation, NoLock); } -/* - * Compare partition interval, compression threshold and retenetion threshold. Note that - * compression threshold or retention threshold can be null. - */ -static bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold) -{ - bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : INTERVAL_TO_SEC(compressionThreshold) > INTERVAL_TO_SEC(partitionInterval); - bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(partitionInterval); - bool retentionGreaterThanCompression = compressionThreshold == NULL || retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(compressionThreshold); - - return compressionGreaterThanInterval && retentionGreaterThanInterval && retentionGreaterThanCompression; -} - -/* - * Check whether the given partition interval aligns with the partition column of the table. - */ -static bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval) -{ - Oid partTypeId; - HeapTuple typeTuple; - Form_pg_type typeForm; - - partTypeId = partitionKey->parttypid[0]; - typeTuple = SearchSysCache1(TYPEOID, partTypeId); - typeForm = (Form_pg_type) GETSTRUCT(typeTuple); - ReleaseSysCache(typeTuple); - - if(strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0) - { - if (partitionInterval->time == 0) - { - return true; - } - } - else if(strncmp(typeForm->typname.data, "timestamp", NAMEDATALEN) == 0 || - strncmp(typeForm->typname.data, "timestamptz", NAMEDATALEN) == 0) - { - return true; - } - - return false; -} - /* * Create the initial pre and post make partitions for the given relation id * by getting the related information from citus_timeseries_tables and utilizing @@ -253,28 +207,3 @@ InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int table_close(citusTimeseriesTable, NoLock); } - -/* - * Get the relation id for citus_timeseries_tables metadata table - */ -static Oid -CitusTimeseriesTablesRelationId() -{ - Oid relationId = get_relname_relid("citus_timeseries_tables", TimeseriesNamespaceId()); - if (relationId == InvalidOid) - { - ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables, called too early?"))); - } - - return relationId; -} - -/* - * TimeseriesNamespaceId returns namespace id of the schema we store timeseries - * related metadata tables. - */ -static Oid -TimeseriesNamespaceId(void) -{ - return get_namespace_oid("citus_timeseries", false); -} \ No newline at end of file diff --git a/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql index 0a98e00c2..9306c18f9 100644 --- a/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql +++ b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql @@ -19,3 +19,6 @@ GRANT USAGE ON SCHEMA citus_timeseries TO PUBLIC; GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC; RESET search_path; + +-- Add trigger to delete from here +-- Add trigger to unschedule cron jobs in future diff --git a/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql index 35c7ca55f..89de3869e 100644 --- a/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql +++ b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql @@ -4,7 +4,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( premake_interval_count int DEFAULT 7, postmake_interval_count int DEFAULT 7, compression_threshold INTERVAL DEFAULT NULL, - retention_threshold INTERVAL DEFAULT NULL) + retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data RETURNS void LANGUAGE C AS 'MODULE_PATHNAME', 'create_timeseries_table'; diff --git a/src/backend/timeseries/timeseries_utils.c b/src/backend/timeseries/timeseries_utils.c new file mode 100644 index 000000000..b152457e6 --- /dev/null +++ b/src/backend/timeseries/timeseries_utils.c @@ -0,0 +1,98 @@ +/*------------------------------------------------------------------------- + * + * timeseries_utils.c + * + * This file contains utility functions for timeseries tables + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "catalog/namespace.h" +#include "datatype/timestamp.h" +#include "partitioning/partdefs.h" +#include "utils/lsyscache.h" +#include "utils/partcache.h" +#include "utils/syscache.h" + +#include "timeseries/timeseries_utils.h" + +#define INTERVAL_TO_SEC(ivp) \ + (((double) (ivp)->time) / ((double) USECS_PER_SEC) + \ + (ivp)->day * (24.0 * SECS_PER_HOUR) + \ + (ivp)->month * (30.0 * SECS_PER_DAY)) + +/* + * Get the relation id for citus_timeseries_tables metadata table + */ +Oid +CitusTimeseriesTablesRelationId() +{ + Oid relationId = get_relname_relid("citus_timeseries_tables", TimeseriesNamespaceId()); + if (relationId == InvalidOid) + { + ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables, called too early?"))); + } + + return relationId; +} + +/* + * TimeseriesNamespaceId returns namespace id of the schema we store timeseries + * related metadata tables. + */ +Oid +TimeseriesNamespaceId() +{ + return get_namespace_oid("citus_timeseries", false); +} + +/* + * Compare partition interval, compression threshold and retenetion threshold. Note that + * compression threshold or retention threshold can be null. + */ +bool +CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold) +{ + bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : INTERVAL_TO_SEC(compressionThreshold) > INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanCompression = compressionThreshold == NULL || retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(compressionThreshold); + + return compressionGreaterThanInterval && retentionGreaterThanInterval && retentionGreaterThanCompression; +} + +/* + * Check whether the given partition interval aligns with the partition column of the table. + */ +bool +CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval) +{ + Oid partTypeId; + HeapTuple typeTuple; + Form_pg_type typeForm; + + partTypeId = partitionKey->parttypid[0]; + typeTuple = SearchSysCache1(TYPEOID, partTypeId); + typeForm = (Form_pg_type) GETSTRUCT(typeTuple); + ReleaseSysCache(typeTuple); + + if(strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0) + { + if (partitionInterval->time == 0) + { + return true; + } + } + else if(strncmp(typeForm->typname.data, "timestamp", NAMEDATALEN) == 0 || + strncmp(typeForm->typname.data, "timestamptz", NAMEDATALEN) == 0) + { + return true; + } + + return false; +} \ No newline at end of file diff --git a/src/include/timeseries/timeseries_utils.h b/src/include/timeseries/timeseries_utils.h new file mode 100644 index 000000000..e04a53dac --- /dev/null +++ b/src/include/timeseries/timeseries_utils.h @@ -0,0 +1,24 @@ +/*------------------------------------------------------------------------- + * + * timeseries_utils.h + * + * Declarations for public utility functions related timeseries + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TIMESERIES_UTILS_H_ +#define TIMESERIES_UTILS_H_ + +#include "postgres.h" +#include "server/datatype/timestamp.h" +#include "server/partitioning/partdefs.h" + +extern Oid CitusTimeseriesTablesRelationId(void); +extern Oid TimeseriesNamespaceId(void); +extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold); +extern bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval); + +#endif /* TIMESERIES_UTILS_H_ */