diff --git a/src/backend/timeseries/create_timeseries_table.c b/src/backend/timeseries/create_timeseries_table.c index 373922929..67d3ba55c 100644 --- a/src/backend/timeseries/create_timeseries_table.c +++ b/src/backend/timeseries/create_timeseries_table.c @@ -32,12 +32,15 @@ #include "distributed/resource_lock.h" #include "timeseries/timeseries_utils.h" +#define default_premake_interval_count 7 + PG_FUNCTION_INFO_V1(create_timeseries_table); -static void InitiateTimeseriesTablePartitions(Oid relationId); +static void InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom); static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, - int preMakePartitionCount, - int postMakePartitionCount, + int postMakePartitionCount, int + preMakePartitionCount, + TimestampTz startFrom, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval); static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, @@ -59,35 +62,61 @@ create_timeseries_table(PG_FUNCTION_ARGS) if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) { + ereport(ERROR, (errmsg("table name and partition interval " + "must be provided"))); PG_RETURN_VOID(); } + if (!PG_ARGISNULL(3) && !PG_ARGISNULL(4)) + { + ereport(ERROR, (errmsg("either premakeintervalcount or startfrom " + "should be provided"))); + } + Oid relationId = PG_GETARG_OID(0); Interval *partitionInterval = PG_GETARG_INTERVAL_P(1); - int preMakePartitionCount = PG_GETARG_INT32(2); - int postMakePartitionCount = PG_GETARG_INT32(3); + int postMakePartitionCount = PG_GETARG_INT32(2); + int preMakePartitionCount = 0; + TimestampTz startFrom = 0; Interval *compressionThresholdInterval = NULL; Interval *retentionThresholdInterval = NULL; + bool useStartFrom = false; - if (!PG_ARGISNULL(4)) + if (!PG_ARGISNULL(3)) { - compressionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(4)); + preMakePartitionCount = DatumGetInt32(PG_GETARG_DATUM(3)); } + else if (!PG_ARGISNULL(4)) + { + startFrom = DatumGetTimestampTz(PG_GETARG_DATUM(4)); + useStartFrom = true; + } + else + { + preMakePartitionCount = default_premake_interval_count; + } + if (!PG_ARGISNULL(5)) { - retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5)); + compressionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5)); + } + + if (!PG_ARGISNULL(6)) + { + retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(6)); } ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, compressionThresholdInterval, retentionThresholdInterval); - InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount, - postMakePartitionCount, compressionThresholdInterval, + InsertIntoCitusTimeseriesTables(relationId, partitionInterval, postMakePartitionCount, + preMakePartitionCount, startFrom, + compressionThresholdInterval, retentionThresholdInterval); - InitiateTimeseriesTablePartitions(relationId); + InitiateTimeseriesTablePartitions(relationId, useStartFrom); PG_RETURN_VOID(); } @@ -165,13 +194,22 @@ ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionIn * create_missing_partitions */ static void -InitiateTimeseriesTablePartitions(Oid relationId) +InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom) { bool readOnly = false; StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo(); - appendStringInfo(initiateTimeseriesPartitionsCommand, - "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", - relationId); + if (useStartFrom) + { + appendStringInfo(initiateTimeseriesPartitionsCommand, + "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, startfrom) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", + relationId); + } + else + { + appendStringInfo(initiateTimeseriesPartitionsCommand, + "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", + relationId); + } int spiConnectionResult = SPI_connect(); if (spiConnectionResult != SPI_OK_CONNECT) @@ -192,8 +230,9 @@ InitiateTimeseriesTablePartitions(Oid relationId) * Add tuples for the given table to the citus_timeseries_tables using given params */ static void -InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int - preMakePartitionCount, int postMakePartitionCount, +InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, + int postMakePartitionCount, int preMakePartitionCount, + TimestampTz startFrom, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) { @@ -203,32 +242,40 @@ InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), RowExclusiveLock); - memset(newValues, 0, sizeof(newValues)); memset(isNulls, false, sizeof(isNulls)); newValues[0] = ObjectIdGetDatum(relationId); newValues[1] = IntervalPGetDatum(partitionInterval); - newValues[2] = Int32GetDatum(preMakePartitionCount); - newValues[3] = Int32GetDatum(postMakePartitionCount); + newValues[2] = Int32GetDatum(postMakePartitionCount); + newValues[3] = Int32GetDatum(preMakePartitionCount); - if (compressionThresholdInterval != NULL) + if (startFrom != 0) { - newValues[4] = IntervalPGetDatum(compressionThresholdInterval); + newValues[4] = TimestampTzGetDatum(startFrom); } else { isNulls[4] = true; } - if (retentionThresholdInterval != NULL) + if (compressionThresholdInterval != NULL) { - newValues[5] = IntervalPGetDatum(retentionThresholdInterval); + newValues[5] = IntervalPGetDatum(compressionThresholdInterval); } else { isNulls[5] = true; } + if (retentionThresholdInterval != NULL) + { + newValues[6] = IntervalPGetDatum(retentionThresholdInterval); + } + else + { + isNulls[6] = true; + } + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), newValues, isNulls); CatalogTupleInsert(citusTimeseriesTable, newTuple); diff --git a/src/backend/timeseries/drop_timeseries_table.c b/src/backend/timeseries/drop_timeseries_table.c index 2ae2b17f5..1976a100a 100644 --- a/src/backend/timeseries/drop_timeseries_table.c +++ b/src/backend/timeseries/drop_timeseries_table.c @@ -29,7 +29,7 @@ PG_FUNCTION_INFO_V1(drop_timeseries_table); * drop_timeseries_table gets the table oid, then it drops * all the metadata related to it. Note that this function doesn't * drop any partitions or any data of the given table. - * + * * TODO: Add unscheduling for automated jobs as well. */ Datum @@ -44,15 +44,17 @@ drop_timeseries_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); - ScanKeyData relIdKey[1]; - Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(), AccessShareLock); + ScanKeyData relIdKey[1]; + Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(), + AccessShareLock); ScanKeyInit(&relIdKey[0], Anum_citus_timeseries_table_relation_id, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); - SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid, false, NULL, 1, relIdKey); + SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid, + false, NULL, 1, relIdKey); HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan); if (HeapTupleIsValid(timeseriesTuple)) @@ -60,8 +62,8 @@ drop_timeseries_table(PG_FUNCTION_ARGS) CatalogTupleDelete(timeseriesRelation, ×eriesTuple->t_self); CommandCounterIncrement(); } - - systable_endscan(timeseriesRelScan); + + systable_endscan(timeseriesRelScan); table_close(timeseriesRelation, NoLock); PG_RETURN_VOID(); diff --git a/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql index fd35dda8a..cc8375729 100644 --- a/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql +++ b/src/backend/timeseries/sql/timeseries--10.1-1--10.2-1.sql @@ -6,8 +6,9 @@ SET search_path TO citus_timeseries; CREATE TABLE citus_timeseries_tables ( logicalrelid regclass NOT NULL PRIMARY KEY, partitioninterval INTERVAL NOT NULL, - premakeintervalcount INT NOT NULL, postmakeintervalcount INT NOT NULL, + premakeintervalcount INT, + startfrom timestamptz, compressionthreshold INTERVAL, retentionthreshold INTERVAL ); @@ -19,6 +20,3 @@ GRANT USAGE ON SCHEMA citus_timeseries TO PUBLIC; GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC; RESET search_path; - --- TODO: Add trigger to delete from here --- TODO: Add trigger to unschedule cron jobs in future diff --git a/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql b/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql index b2b0d3d4c..67d434333 100644 --- a/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql +++ b/src/backend/timeseries/sql/udfs/create_timeseries_table/10.2-1.sql @@ -1,23 +1,24 @@ CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( table_name regclass, partition_interval INTERVAL, - premake_interval_count int DEFAULT 7, postmake_interval_count int DEFAULT 7, + premake_interval_count int DEFAULT NULL, + start_from timestamptz DEFAULT NULL, compression_threshold INTERVAL DEFAULT NULL, retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data RETURNS void LANGUAGE C AS 'MODULE_PATHNAME', 'create_timeseries_table'; + COMMENT ON FUNCTION pg_catalog.create_timeseries_table( table_name regclass, partition_interval INTERVAL, - premake_interval_count int, postmake_interval_count int, + premake_interval_count int, + start_from timestamptz, compression_threshold INTERVAL, retention_threshold INTERVAL) IS 'creates a citus timeseries table which will be autopartitioned'; --- Add UDF to undo that! - keep in mind but plan depending on the resources --- Add note about alter won't change existing tables --- one function instead of multiple for compression and expiration \ No newline at end of file +-- TODO: Add note about alter won't change existing tables diff --git a/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql index 89de3869e..0799c12c4 100644 --- a/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql +++ b/src/backend/timeseries/sql/udfs/create_timeseries_table/latest.sql @@ -1,19 +1,22 @@ CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( table_name regclass, partition_interval INTERVAL, - premake_interval_count int DEFAULT 7, postmake_interval_count int DEFAULT 7, + premake_interval_count int DEFAULT NULL, + start_from timestamptz DEFAULT NULL, compression_threshold INTERVAL DEFAULT NULL, retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data RETURNS void LANGUAGE C AS 'MODULE_PATHNAME', 'create_timeseries_table'; + COMMENT ON FUNCTION pg_catalog.create_timeseries_table( table_name regclass, partition_interval INTERVAL, - premake_interval_count int, postmake_interval_count int, + premake_interval_count int, + start_from timestamptz, compression_threshold INTERVAL, retention_threshold INTERVAL) IS 'creates a citus timeseries table which will be autopartitioned'; diff --git a/src/backend/timeseries/sql/udfs/drop_timeseries_table/latest.sql b/src/backend/timeseries/sql/udfs/drop_timeseries_table/latest.sql index a96def434..35b770e17 100644 --- a/src/backend/timeseries/sql/udfs/drop_timeseries_table/latest.sql +++ b/src/backend/timeseries/sql/udfs/drop_timeseries_table/latest.sql @@ -7,4 +7,3 @@ AS 'MODULE_PATHNAME', 'drop_timeseries_table'; COMMENT ON FUNCTION pg_catalog.drop_timeseries_table( logicalrelid regclass) IS 'drops a citus timeseries table by removing metadata'; --- TODO: Update comment for unscheduling diff --git a/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql b/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql index cc0654516..6c366be91 100644 --- a/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql +++ b/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql @@ -9,6 +9,7 @@ returns table( LANGUAGE plpgsql AS $$ DECLARE + current_partition_count int; table_partition_interval INTERVAL; table_partition_column_type_name text; current_range_from_value timestamptz := NULL; @@ -17,15 +18,8 @@ BEGIN /* * First check whether such timeseries table exists. If not, error out. * - * Then check if start_from is given. If it is, create the partition for that time - * and let remaining of the function fill the gap from start_from to to_date. - * - * TODO: That part is implemented by assuming that there is no partition exist for - * the given table, in other words that function will be called via create_timeseries_table - * only when start_from is given. That will be handled while adding starting time for old - * data ingestion. That part will be implemented with Task 1.2. - * - * TODO: Add tests to cover start_from for the PR to handle Task 1.2. + * Then check if start_from is given. If it is, create the range using that time + * and let remaining of the function fill the gap till to_date. */ SELECT partitioninterval @@ -37,53 +31,60 @@ BEGIN RAISE '% must be timeseries table', table_name; END IF; - -- Get datatype here to generate range values in the right data format - -- Since we already check that timeseries tables have single column to partition the table - -- we can directly get the 0th element of the partattrs column - SELECT atttypid::regtype::text INTO table_partition_column_type_name - FROM pg_attribute - WHERE attrelid = table_name::oid - AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid); - IF start_from IS NOT NULL THEN + + SELECT count(*) + INTO current_partition_count + FROM pg_catalog.time_partitions + WHERE parent_table = table_name; + /* - * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. - * Since we will create all other partitions by adding intervals, truncating given start time will provide - * more intuitive interval ranges, instead of starting from start_from directly. + * If any partition exist for the given table, we must start from the initial partition + * for that table and go backward to have consistent range values. Otherwise, if we start + * directly from the given start_from, we may end up with inconsistent range values. */ - IF table_partition_interval < INTERVAL '1 hour' THEN - current_range_from_value = date_trunc('minute', start_from); - ELSIF table_partition_interval < INTERVAL '1 day' THEN - current_range_from_value = date_trunc('hour', start_from); - ELSIF table_partition_interval < INTERVAL '1 week' THEN - current_range_from_value = date_trunc('day', start_from); - ELSIF table_partition_interval < INTERVAL '1 month' THEN - current_range_from_value = date_trunc('week', start_from); - ELSIF table_partition_interval < INTERVAL '1 year' THEN - current_range_from_value = date_trunc('month', start_from); - ELSE - current_range_from_value = date_trunc('year', start_from); - END IF; + IF current_partition_count > 0 THEN + SELECT from_value::timestamptz, to_value::timestamptz + INTO current_range_from_value, current_range_to_value + FROM pg_catalog.time_partitions + WHERE parent_table = table_name + ORDER BY from_value::timestamptz; - current_range_to_value := current_range_from_value + table_partition_interval; + WHILE current_range_from_value > start_from LOOP + current_range_from_value := current_range_from_value - table_partition_interval; + END LOOP; - -- TODO: Check for dynamic way to do it or create a function for it to use for the call at the end - IF table_partition_column_type_name = 'date' THEN - RETURN QUERY SELECT current_range_from_value::date::text, current_range_to_value::date::text; - ELSIF table_partition_column_type_name = 'timestamp without time zone' THEN - RETURN QUERY SELECT current_range_from_value::timestamp::text, current_range_to_value::timestamp::text; - ELSIF table_partition_column_type_name = 'timestamp with time zone' THEN - RETURN QUERY SELECT current_range_from_value::timestamptz::text, current_range_to_value::timestamptz::text; + current_range_to_value := current_range_from_value + table_partition_interval; ELSE - RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; + /* + * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. + * Since we will create all other partitions by adding intervals, truncating given start time will provide + * more intuitive interval ranges, instead of starting from start_from directly. + */ + IF table_partition_interval < INTERVAL '1 hour' THEN + current_range_from_value = date_trunc('minute', start_from); + ELSIF table_partition_interval < INTERVAL '1 day' THEN + current_range_from_value = date_trunc('hour', start_from); + ELSIF table_partition_interval < INTERVAL '1 week' THEN + current_range_from_value = date_trunc('day', start_from); + ELSIF table_partition_interval < INTERVAL '1 month' THEN + current_range_from_value = date_trunc('week', start_from); + ELSIF table_partition_interval < INTERVAL '1 year' THEN + current_range_from_value = date_trunc('month', start_from); + ELSE + current_range_from_value = date_trunc('year', start_from); + END IF; + + current_range_to_value := current_range_from_value + table_partition_interval; END IF; END IF; /* * To be able to fill any gaps after the initial partition of the timeseries table, - * we are starting from the first partition instead of the last. + * we are starting from the first partition instead of the last. If start_from + * is given we've already used that to initiate ranges. * - * Also note that we must have either start_from or an initial partition for the timeseries + * Note that we must have either start_from or an initial partition for the timeseries * table, as we call that function while creating timeseries table first. */ IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN @@ -94,23 +95,36 @@ BEGIN ORDER BY from_value::timestamptz; END IF; - WHILE current_range_to_value < to_date LOOP - current_range_from_value := current_range_to_value; - current_range_to_value := current_range_to_value + table_partition_interval; + /* + * Get datatype here to generate range values in the right data format + * Since we already check that timeseries tables have single column to partition the table + * we can directly get the 0th element of the partattrs column + */ + SELECT atttypid::regtype::text INTO table_partition_column_type_name + FROM pg_attribute + WHERE attrelid = table_name::oid + AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid); - -- Check whether partition with given range has already been created - -- Since partition interval can be given as, we are converting all variables to timestamptz to make sure - -- that we are comparing same type of parameters + WHILE current_range_from_value < to_date LOOP + /* + * Check whether partition with given range has already been created + * Since partition interval can be given as, we are converting all variables to timestamptz to make sure + * that we are comparing same type of parameters + */ PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz; IF found THEN + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; CONTINUE; END IF; - -- Check whether any other partition covers from_value or to_value - -- That means some partitions have been created manually and we must error out. + /* + * Check whether any other partition covers from_value or to_value + * That means some partitions have been created manually and we must error out. + */ PERFORM * FROM pg_catalog.time_partitions - WHERE (current_range_from_value::timestamptz >= from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR - (current_range_to_value::timestamptz >= from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); + WHERE (current_range_from_value::timestamptz > from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR + (current_range_to_value::timestamptz > from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); IF found THEN RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name; END IF; @@ -124,6 +138,9 @@ BEGIN ELSE RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; END IF; + + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; END LOOP; RETURN; diff --git a/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/latest.sql b/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/latest.sql index cc0654516..6c366be91 100644 --- a/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/latest.sql +++ b/src/backend/timeseries/sql/udfs/get_missing_partition_ranges/latest.sql @@ -9,6 +9,7 @@ returns table( LANGUAGE plpgsql AS $$ DECLARE + current_partition_count int; table_partition_interval INTERVAL; table_partition_column_type_name text; current_range_from_value timestamptz := NULL; @@ -17,15 +18,8 @@ BEGIN /* * First check whether such timeseries table exists. If not, error out. * - * Then check if start_from is given. If it is, create the partition for that time - * and let remaining of the function fill the gap from start_from to to_date. - * - * TODO: That part is implemented by assuming that there is no partition exist for - * the given table, in other words that function will be called via create_timeseries_table - * only when start_from is given. That will be handled while adding starting time for old - * data ingestion. That part will be implemented with Task 1.2. - * - * TODO: Add tests to cover start_from for the PR to handle Task 1.2. + * Then check if start_from is given. If it is, create the range using that time + * and let remaining of the function fill the gap till to_date. */ SELECT partitioninterval @@ -37,53 +31,60 @@ BEGIN RAISE '% must be timeseries table', table_name; END IF; - -- Get datatype here to generate range values in the right data format - -- Since we already check that timeseries tables have single column to partition the table - -- we can directly get the 0th element of the partattrs column - SELECT atttypid::regtype::text INTO table_partition_column_type_name - FROM pg_attribute - WHERE attrelid = table_name::oid - AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid); - IF start_from IS NOT NULL THEN + + SELECT count(*) + INTO current_partition_count + FROM pg_catalog.time_partitions + WHERE parent_table = table_name; + /* - * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. - * Since we will create all other partitions by adding intervals, truncating given start time will provide - * more intuitive interval ranges, instead of starting from start_from directly. + * If any partition exist for the given table, we must start from the initial partition + * for that table and go backward to have consistent range values. Otherwise, if we start + * directly from the given start_from, we may end up with inconsistent range values. */ - IF table_partition_interval < INTERVAL '1 hour' THEN - current_range_from_value = date_trunc('minute', start_from); - ELSIF table_partition_interval < INTERVAL '1 day' THEN - current_range_from_value = date_trunc('hour', start_from); - ELSIF table_partition_interval < INTERVAL '1 week' THEN - current_range_from_value = date_trunc('day', start_from); - ELSIF table_partition_interval < INTERVAL '1 month' THEN - current_range_from_value = date_trunc('week', start_from); - ELSIF table_partition_interval < INTERVAL '1 year' THEN - current_range_from_value = date_trunc('month', start_from); - ELSE - current_range_from_value = date_trunc('year', start_from); - END IF; + IF current_partition_count > 0 THEN + SELECT from_value::timestamptz, to_value::timestamptz + INTO current_range_from_value, current_range_to_value + FROM pg_catalog.time_partitions + WHERE parent_table = table_name + ORDER BY from_value::timestamptz; - current_range_to_value := current_range_from_value + table_partition_interval; + WHILE current_range_from_value > start_from LOOP + current_range_from_value := current_range_from_value - table_partition_interval; + END LOOP; - -- TODO: Check for dynamic way to do it or create a function for it to use for the call at the end - IF table_partition_column_type_name = 'date' THEN - RETURN QUERY SELECT current_range_from_value::date::text, current_range_to_value::date::text; - ELSIF table_partition_column_type_name = 'timestamp without time zone' THEN - RETURN QUERY SELECT current_range_from_value::timestamp::text, current_range_to_value::timestamp::text; - ELSIF table_partition_column_type_name = 'timestamp with time zone' THEN - RETURN QUERY SELECT current_range_from_value::timestamptz::text, current_range_to_value::timestamptz::text; + current_range_to_value := current_range_from_value + table_partition_interval; ELSE - RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; + /* + * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. + * Since we will create all other partitions by adding intervals, truncating given start time will provide + * more intuitive interval ranges, instead of starting from start_from directly. + */ + IF table_partition_interval < INTERVAL '1 hour' THEN + current_range_from_value = date_trunc('minute', start_from); + ELSIF table_partition_interval < INTERVAL '1 day' THEN + current_range_from_value = date_trunc('hour', start_from); + ELSIF table_partition_interval < INTERVAL '1 week' THEN + current_range_from_value = date_trunc('day', start_from); + ELSIF table_partition_interval < INTERVAL '1 month' THEN + current_range_from_value = date_trunc('week', start_from); + ELSIF table_partition_interval < INTERVAL '1 year' THEN + current_range_from_value = date_trunc('month', start_from); + ELSE + current_range_from_value = date_trunc('year', start_from); + END IF; + + current_range_to_value := current_range_from_value + table_partition_interval; END IF; END IF; /* * To be able to fill any gaps after the initial partition of the timeseries table, - * we are starting from the first partition instead of the last. + * we are starting from the first partition instead of the last. If start_from + * is given we've already used that to initiate ranges. * - * Also note that we must have either start_from or an initial partition for the timeseries + * Note that we must have either start_from or an initial partition for the timeseries * table, as we call that function while creating timeseries table first. */ IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN @@ -94,23 +95,36 @@ BEGIN ORDER BY from_value::timestamptz; END IF; - WHILE current_range_to_value < to_date LOOP - current_range_from_value := current_range_to_value; - current_range_to_value := current_range_to_value + table_partition_interval; + /* + * Get datatype here to generate range values in the right data format + * Since we already check that timeseries tables have single column to partition the table + * we can directly get the 0th element of the partattrs column + */ + SELECT atttypid::regtype::text INTO table_partition_column_type_name + FROM pg_attribute + WHERE attrelid = table_name::oid + AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid); - -- Check whether partition with given range has already been created - -- Since partition interval can be given as, we are converting all variables to timestamptz to make sure - -- that we are comparing same type of parameters + WHILE current_range_from_value < to_date LOOP + /* + * Check whether partition with given range has already been created + * Since partition interval can be given as, we are converting all variables to timestamptz to make sure + * that we are comparing same type of parameters + */ PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz; IF found THEN + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; CONTINUE; END IF; - -- Check whether any other partition covers from_value or to_value - -- That means some partitions have been created manually and we must error out. + /* + * Check whether any other partition covers from_value or to_value + * That means some partitions have been created manually and we must error out. + */ PERFORM * FROM pg_catalog.time_partitions - WHERE (current_range_from_value::timestamptz >= from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR - (current_range_to_value::timestamptz >= from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); + WHERE (current_range_from_value::timestamptz > from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR + (current_range_to_value::timestamptz > from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); IF found THEN RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name; END IF; @@ -124,6 +138,9 @@ BEGIN ELSE RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; END IF; + + current_range_from_value := current_range_to_value; + current_range_to_value := current_range_to_value + table_partition_interval; END LOOP; RETURN; diff --git a/src/include/timeseries/timeseries_utils.h b/src/include/timeseries/timeseries_utils.h index 90f537d78..62201f977 100644 --- a/src/include/timeseries/timeseries_utils.h +++ b/src/include/timeseries/timeseries_utils.h @@ -16,13 +16,14 @@ #include "server/datatype/timestamp.h" #include "server/partitioning/partdefs.h" -#define Natts_citus_timeseries_tables 6 +#define Natts_citus_timeseries_tables 7 #define Anum_citus_timeseries_table_relation_id 1 #define Anum_citus_timeseries_table_partition_interval 2 -#define Anum_citus_timeseries_table_premake_interval_count 3 -#define Anum_citus_timeseries_table_postmake_interval_count 4 -#define Anum_citus_timeseries_table_compression_threshold 5 -#define Anum_citus_timeseries_table_retention_threshold 6 +#define Anum_citus_timeseries_table_postmake_interval_count 3 +#define Anum_citus_timeseries_table_premake_interval_count 4 +#define Anum_citus_timeseries_table_start_from 5 +#define Anum_citus_timeseries_table_compression_threshold 6 +#define Anum_citus_timeseries_table_retention_threshold 7 #define INTERVAL_TO_SEC(ivp) \ (((double) (ivp)->time) / ((double) USECS_PER_SEC) + \