Add start_from parameter to create_timeseries_table

velioglu/create_timeseries_table
Burak Velioglu 2021-08-24 00:45:34 +03:00
parent 4754a86e31
commit 06dd189c8f
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
9 changed files with 240 additions and 155 deletions

View File

@ -32,12 +32,15 @@
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "timeseries/timeseries_utils.h" #include "timeseries/timeseries_utils.h"
#define default_premake_interval_count 7
PG_FUNCTION_INFO_V1(create_timeseries_table); PG_FUNCTION_INFO_V1(create_timeseries_table);
static void InitiateTimeseriesTablePartitions(Oid relationId); static void InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom);
static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval,
int preMakePartitionCount, int postMakePartitionCount, int
int postMakePartitionCount, preMakePartitionCount,
TimestampTz startFrom,
Interval *compressionThresholdInterval, Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval); Interval *retentionThresholdInterval);
static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId,
@ -59,35 +62,61 @@ create_timeseries_table(PG_FUNCTION_ARGS)
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
{ {
ereport(ERROR, (errmsg("table name and partition interval "
"must be provided")));
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
if (!PG_ARGISNULL(3) && !PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("either premakeintervalcount or startfrom "
"should be provided")));
}
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
Interval *partitionInterval = PG_GETARG_INTERVAL_P(1); Interval *partitionInterval = PG_GETARG_INTERVAL_P(1);
int preMakePartitionCount = PG_GETARG_INT32(2); int postMakePartitionCount = PG_GETARG_INT32(2);
int postMakePartitionCount = PG_GETARG_INT32(3); int preMakePartitionCount = 0;
TimestampTz startFrom = 0;
Interval *compressionThresholdInterval = NULL; Interval *compressionThresholdInterval = NULL;
Interval *retentionThresholdInterval = NULL; Interval *retentionThresholdInterval = NULL;
bool useStartFrom = false;
if (!PG_ARGISNULL(4)) if (!PG_ARGISNULL(3))
{ {
compressionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(4)); preMakePartitionCount = DatumGetInt32(PG_GETARG_DATUM(3));
} }
else if (!PG_ARGISNULL(4))
{
startFrom = DatumGetTimestampTz(PG_GETARG_DATUM(4));
useStartFrom = true;
}
else
{
preMakePartitionCount = default_premake_interval_count;
}
if (!PG_ARGISNULL(5)) if (!PG_ARGISNULL(5))
{ {
retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5)); compressionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5));
}
if (!PG_ARGISNULL(6))
{
retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(6));
} }
ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval,
compressionThresholdInterval, compressionThresholdInterval,
retentionThresholdInterval); retentionThresholdInterval);
InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount, InsertIntoCitusTimeseriesTables(relationId, partitionInterval, postMakePartitionCount,
postMakePartitionCount, compressionThresholdInterval, preMakePartitionCount, startFrom,
compressionThresholdInterval,
retentionThresholdInterval); retentionThresholdInterval);
InitiateTimeseriesTablePartitions(relationId); InitiateTimeseriesTablePartitions(relationId, useStartFrom);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -165,13 +194,22 @@ ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionIn
* create_missing_partitions * create_missing_partitions
*/ */
static void static void
InitiateTimeseriesTablePartitions(Oid relationId) InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom)
{ {
bool readOnly = false; bool readOnly = false;
StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo(); StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo();
if (useStartFrom)
{
appendStringInfo(initiateTimeseriesPartitionsCommand,
"SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, startfrom) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;",
relationId);
}
else
{
appendStringInfo(initiateTimeseriesPartitionsCommand, appendStringInfo(initiateTimeseriesPartitionsCommand,
"SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;",
relationId); relationId);
}
int spiConnectionResult = SPI_connect(); int spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT) if (spiConnectionResult != SPI_OK_CONNECT)
@ -192,8 +230,9 @@ InitiateTimeseriesTablePartitions(Oid relationId)
* Add tuples for the given table to the citus_timeseries_tables using given params * Add tuples for the given table to the citus_timeseries_tables using given params
*/ */
static void static void
InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval,
preMakePartitionCount, int postMakePartitionCount, int postMakePartitionCount, int preMakePartitionCount,
TimestampTz startFrom,
Interval *compressionThresholdInterval, Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval) Interval *retentionThresholdInterval)
{ {
@ -203,32 +242,40 @@ InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int
Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(),
RowExclusiveLock); RowExclusiveLock);
memset(newValues, 0, sizeof(newValues));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
newValues[0] = ObjectIdGetDatum(relationId); newValues[0] = ObjectIdGetDatum(relationId);
newValues[1] = IntervalPGetDatum(partitionInterval); newValues[1] = IntervalPGetDatum(partitionInterval);
newValues[2] = Int32GetDatum(preMakePartitionCount); newValues[2] = Int32GetDatum(postMakePartitionCount);
newValues[3] = Int32GetDatum(postMakePartitionCount); newValues[3] = Int32GetDatum(preMakePartitionCount);
if (compressionThresholdInterval != NULL) if (startFrom != 0)
{ {
newValues[4] = IntervalPGetDatum(compressionThresholdInterval); newValues[4] = TimestampTzGetDatum(startFrom);
} }
else else
{ {
isNulls[4] = true; isNulls[4] = true;
} }
if (retentionThresholdInterval != NULL) if (compressionThresholdInterval != NULL)
{ {
newValues[5] = IntervalPGetDatum(retentionThresholdInterval); newValues[5] = IntervalPGetDatum(compressionThresholdInterval);
} }
else else
{ {
isNulls[5] = true; isNulls[5] = true;
} }
if (retentionThresholdInterval != NULL)
{
newValues[6] = IntervalPGetDatum(retentionThresholdInterval);
}
else
{
isNulls[6] = true;
}
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable),
newValues, isNulls); newValues, isNulls);
CatalogTupleInsert(citusTimeseriesTable, newTuple); CatalogTupleInsert(citusTimeseriesTable, newTuple);

View File

@ -45,14 +45,16 @@ drop_timeseries_table(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
ScanKeyData relIdKey[1]; ScanKeyData relIdKey[1];
Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(), AccessShareLock); Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(),
AccessShareLock);
ScanKeyInit(&relIdKey[0], ScanKeyInit(&relIdKey[0],
Anum_citus_timeseries_table_relation_id, Anum_citus_timeseries_table_relation_id,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId)); ObjectIdGetDatum(relationId));
SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid, false, NULL, 1, relIdKey); SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid,
false, NULL, 1, relIdKey);
HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan); HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan);
if (HeapTupleIsValid(timeseriesTuple)) if (HeapTupleIsValid(timeseriesTuple))

View File

@ -6,8 +6,9 @@ SET search_path TO citus_timeseries;
CREATE TABLE citus_timeseries_tables ( CREATE TABLE citus_timeseries_tables (
logicalrelid regclass NOT NULL PRIMARY KEY, logicalrelid regclass NOT NULL PRIMARY KEY,
partitioninterval INTERVAL NOT NULL, partitioninterval INTERVAL NOT NULL,
premakeintervalcount INT NOT NULL,
postmakeintervalcount INT NOT NULL, postmakeintervalcount INT NOT NULL,
premakeintervalcount INT,
startfrom timestamptz,
compressionthreshold INTERVAL, compressionthreshold INTERVAL,
retentionthreshold INTERVAL retentionthreshold INTERVAL
); );
@ -19,6 +20,3 @@ GRANT USAGE ON SCHEMA citus_timeseries TO PUBLIC;
GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC; GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC;
RESET search_path; RESET search_path;
-- TODO: Add trigger to delete from here
-- TODO: Add trigger to unschedule cron jobs in future

View File

@ -1,23 +1,24 @@
CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table(
table_name regclass, table_name regclass,
partition_interval INTERVAL, partition_interval INTERVAL,
premake_interval_count int DEFAULT 7,
postmake_interval_count int DEFAULT 7, postmake_interval_count int DEFAULT 7,
premake_interval_count int DEFAULT NULL,
start_from timestamptz DEFAULT NULL,
compression_threshold INTERVAL DEFAULT NULL, compression_threshold INTERVAL DEFAULT NULL,
retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME', 'create_timeseries_table'; AS 'MODULE_PATHNAME', 'create_timeseries_table';
COMMENT ON FUNCTION pg_catalog.create_timeseries_table( COMMENT ON FUNCTION pg_catalog.create_timeseries_table(
table_name regclass, table_name regclass,
partition_interval INTERVAL, partition_interval INTERVAL,
premake_interval_count int,
postmake_interval_count int, postmake_interval_count int,
premake_interval_count int,
start_from timestamptz,
compression_threshold INTERVAL, compression_threshold INTERVAL,
retention_threshold INTERVAL) retention_threshold INTERVAL)
IS 'creates a citus timeseries table which will be autopartitioned'; IS 'creates a citus timeseries table which will be autopartitioned';
-- Add UDF to undo that! - keep in mind but plan depending on the resources -- TODO: Add note about alter won't change existing tables
-- Add note about alter won't change existing tables
-- one function instead of multiple for compression and expiration

View File

@ -1,19 +1,22 @@
CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table( CREATE OR REPLACE FUNCTION pg_catalog.create_timeseries_table(
table_name regclass, table_name regclass,
partition_interval INTERVAL, partition_interval INTERVAL,
premake_interval_count int DEFAULT 7,
postmake_interval_count int DEFAULT 7, postmake_interval_count int DEFAULT 7,
premake_interval_count int DEFAULT NULL,
start_from timestamptz DEFAULT NULL,
compression_threshold INTERVAL DEFAULT NULL, compression_threshold INTERVAL DEFAULT NULL,
retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data retention_threshold INTERVAL DEFAULT NULL) -- can change the order with compression, raise a message about dropping data
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME', 'create_timeseries_table'; AS 'MODULE_PATHNAME', 'create_timeseries_table';
COMMENT ON FUNCTION pg_catalog.create_timeseries_table( COMMENT ON FUNCTION pg_catalog.create_timeseries_table(
table_name regclass, table_name regclass,
partition_interval INTERVAL, partition_interval INTERVAL,
premake_interval_count int,
postmake_interval_count int, postmake_interval_count int,
premake_interval_count int,
start_from timestamptz,
compression_threshold INTERVAL, compression_threshold INTERVAL,
retention_threshold INTERVAL) retention_threshold INTERVAL)
IS 'creates a citus timeseries table which will be autopartitioned'; IS 'creates a citus timeseries table which will be autopartitioned';

View File

@ -7,4 +7,3 @@ AS 'MODULE_PATHNAME', 'drop_timeseries_table';
COMMENT ON FUNCTION pg_catalog.drop_timeseries_table( COMMENT ON FUNCTION pg_catalog.drop_timeseries_table(
logicalrelid regclass) logicalrelid regclass)
IS 'drops a citus timeseries table by removing metadata'; IS 'drops a citus timeseries table by removing metadata';
-- TODO: Update comment for unscheduling

View File

@ -9,6 +9,7 @@ returns table(
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
DECLARE DECLARE
current_partition_count int;
table_partition_interval INTERVAL; table_partition_interval INTERVAL;
table_partition_column_type_name text; table_partition_column_type_name text;
current_range_from_value timestamptz := NULL; current_range_from_value timestamptz := NULL;
@ -17,15 +18,8 @@ BEGIN
/* /*
* First check whether such timeseries table exists. If not, error out. * First check whether such timeseries table exists. If not, error out.
* *
* Then check if start_from is given. If it is, create the partition for that time * Then check if start_from is given. If it is, create the range using that time
* and let remaining of the function fill the gap from start_from to to_date. * and let remaining of the function fill the gap till to_date.
*
* TODO: That part is implemented by assuming that there is no partition exist for
* the given table, in other words that function will be called via create_timeseries_table
* only when start_from is given. That will be handled while adding starting time for old
* data ingestion. That part will be implemented with Task 1.2.
*
* TODO: Add tests to cover start_from for the PR to handle Task 1.2.
*/ */
SELECT partitioninterval SELECT partitioninterval
@ -37,15 +31,31 @@ BEGIN
RAISE '% must be timeseries table', table_name; RAISE '% must be timeseries table', table_name;
END IF; END IF;
-- Get datatype here to generate range values in the right data format
-- Since we already check that timeseries tables have single column to partition the table
-- we can directly get the 0th element of the partattrs column
SELECT atttypid::regtype::text INTO table_partition_column_type_name
FROM pg_attribute
WHERE attrelid = table_name::oid
AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid);
IF start_from IS NOT NULL THEN IF start_from IS NOT NULL THEN
SELECT count(*)
INTO current_partition_count
FROM pg_catalog.time_partitions
WHERE parent_table = table_name;
/*
* If any partition exist for the given table, we must start from the initial partition
* for that table and go backward to have consistent range values. Otherwise, if we start
* directly from the given start_from, we may end up with inconsistent range values.
*/
IF current_partition_count > 0 THEN
SELECT from_value::timestamptz, to_value::timestamptz
INTO current_range_from_value, current_range_to_value
FROM pg_catalog.time_partitions
WHERE parent_table = table_name
ORDER BY from_value::timestamptz;
WHILE current_range_from_value > start_from LOOP
current_range_from_value := current_range_from_value - table_partition_interval;
END LOOP;
current_range_to_value := current_range_from_value + table_partition_interval;
ELSE
/* /*
* Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table.
* Since we will create all other partitions by adding intervals, truncating given start time will provide * Since we will create all other partitions by adding intervals, truncating given start time will provide
@ -66,24 +76,15 @@ BEGIN
END IF; END IF;
current_range_to_value := current_range_from_value + table_partition_interval; current_range_to_value := current_range_from_value + table_partition_interval;
-- TODO: Check for dynamic way to do it or create a function for it to use for the call at the end
IF table_partition_column_type_name = 'date' THEN
RETURN QUERY SELECT current_range_from_value::date::text, current_range_to_value::date::text;
ELSIF table_partition_column_type_name = 'timestamp without time zone' THEN
RETURN QUERY SELECT current_range_from_value::timestamp::text, current_range_to_value::timestamp::text;
ELSIF table_partition_column_type_name = 'timestamp with time zone' THEN
RETURN QUERY SELECT current_range_from_value::timestamptz::text, current_range_to_value::timestamptz::text;
ELSE
RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name;
END IF; END IF;
END IF; END IF;
/* /*
* To be able to fill any gaps after the initial partition of the timeseries table, * To be able to fill any gaps after the initial partition of the timeseries table,
* we are starting from the first partition instead of the last. * we are starting from the first partition instead of the last. If start_from
* is given we've already used that to initiate ranges.
* *
* Also note that we must have either start_from or an initial partition for the timeseries * Note that we must have either start_from or an initial partition for the timeseries
* table, as we call that function while creating timeseries table first. * table, as we call that function while creating timeseries table first.
*/ */
IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN
@ -94,23 +95,36 @@ BEGIN
ORDER BY from_value::timestamptz; ORDER BY from_value::timestamptz;
END IF; END IF;
WHILE current_range_to_value < to_date LOOP /*
current_range_from_value := current_range_to_value; * Get datatype here to generate range values in the right data format
current_range_to_value := current_range_to_value + table_partition_interval; * Since we already check that timeseries tables have single column to partition the table
* we can directly get the 0th element of the partattrs column
*/
SELECT atttypid::regtype::text INTO table_partition_column_type_name
FROM pg_attribute
WHERE attrelid = table_name::oid
AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid);
-- Check whether partition with given range has already been created WHILE current_range_from_value < to_date LOOP
-- Since partition interval can be given as, we are converting all variables to timestamptz to make sure /*
-- that we are comparing same type of parameters * Check whether partition with given range has already been created
* Since partition interval can be given as, we are converting all variables to timestamptz to make sure
* that we are comparing same type of parameters
*/
PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz; PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz;
IF found THEN IF found THEN
current_range_from_value := current_range_to_value;
current_range_to_value := current_range_to_value + table_partition_interval;
CONTINUE; CONTINUE;
END IF; END IF;
-- Check whether any other partition covers from_value or to_value /*
-- That means some partitions have been created manually and we must error out. * Check whether any other partition covers from_value or to_value
* That means some partitions have been created manually and we must error out.
*/
PERFORM * FROM pg_catalog.time_partitions PERFORM * FROM pg_catalog.time_partitions
WHERE (current_range_from_value::timestamptz >= from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR WHERE (current_range_from_value::timestamptz > from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR
(current_range_to_value::timestamptz >= from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); (current_range_to_value::timestamptz > from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz);
IF found THEN IF found THEN
RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name; RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name;
END IF; END IF;
@ -124,6 +138,9 @@ BEGIN
ELSE ELSE
RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name;
END IF; END IF;
current_range_from_value := current_range_to_value;
current_range_to_value := current_range_to_value + table_partition_interval;
END LOOP; END LOOP;
RETURN; RETURN;

View File

@ -9,6 +9,7 @@ returns table(
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
DECLARE DECLARE
current_partition_count int;
table_partition_interval INTERVAL; table_partition_interval INTERVAL;
table_partition_column_type_name text; table_partition_column_type_name text;
current_range_from_value timestamptz := NULL; current_range_from_value timestamptz := NULL;
@ -17,15 +18,8 @@ BEGIN
/* /*
* First check whether such timeseries table exists. If not, error out. * First check whether such timeseries table exists. If not, error out.
* *
* Then check if start_from is given. If it is, create the partition for that time * Then check if start_from is given. If it is, create the range using that time
* and let remaining of the function fill the gap from start_from to to_date. * and let remaining of the function fill the gap till to_date.
*
* TODO: That part is implemented by assuming that there is no partition exist for
* the given table, in other words that function will be called via create_timeseries_table
* only when start_from is given. That will be handled while adding starting time for old
* data ingestion. That part will be implemented with Task 1.2.
*
* TODO: Add tests to cover start_from for the PR to handle Task 1.2.
*/ */
SELECT partitioninterval SELECT partitioninterval
@ -37,15 +31,31 @@ BEGIN
RAISE '% must be timeseries table', table_name; RAISE '% must be timeseries table', table_name;
END IF; END IF;
-- Get datatype here to generate range values in the right data format
-- Since we already check that timeseries tables have single column to partition the table
-- we can directly get the 0th element of the partattrs column
SELECT atttypid::regtype::text INTO table_partition_column_type_name
FROM pg_attribute
WHERE attrelid = table_name::oid
AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid);
IF start_from IS NOT NULL THEN IF start_from IS NOT NULL THEN
SELECT count(*)
INTO current_partition_count
FROM pg_catalog.time_partitions
WHERE parent_table = table_name;
/*
* If any partition exist for the given table, we must start from the initial partition
* for that table and go backward to have consistent range values. Otherwise, if we start
* directly from the given start_from, we may end up with inconsistent range values.
*/
IF current_partition_count > 0 THEN
SELECT from_value::timestamptz, to_value::timestamptz
INTO current_range_from_value, current_range_to_value
FROM pg_catalog.time_partitions
WHERE parent_table = table_name
ORDER BY from_value::timestamptz;
WHILE current_range_from_value > start_from LOOP
current_range_from_value := current_range_from_value - table_partition_interval;
END LOOP;
current_range_to_value := current_range_from_value + table_partition_interval;
ELSE
/* /*
* Decide on the current_range_from_value of the initial partition according to interval of the timeseries table. * Decide on the current_range_from_value of the initial partition according to interval of the timeseries table.
* Since we will create all other partitions by adding intervals, truncating given start time will provide * Since we will create all other partitions by adding intervals, truncating given start time will provide
@ -66,24 +76,15 @@ BEGIN
END IF; END IF;
current_range_to_value := current_range_from_value + table_partition_interval; current_range_to_value := current_range_from_value + table_partition_interval;
-- TODO: Check for dynamic way to do it or create a function for it to use for the call at the end
IF table_partition_column_type_name = 'date' THEN
RETURN QUERY SELECT current_range_from_value::date::text, current_range_to_value::date::text;
ELSIF table_partition_column_type_name = 'timestamp without time zone' THEN
RETURN QUERY SELECT current_range_from_value::timestamp::text, current_range_to_value::timestamp::text;
ELSIF table_partition_column_type_name = 'timestamp with time zone' THEN
RETURN QUERY SELECT current_range_from_value::timestamptz::text, current_range_to_value::timestamptz::text;
ELSE
RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name;
END IF; END IF;
END IF; END IF;
/* /*
* To be able to fill any gaps after the initial partition of the timeseries table, * To be able to fill any gaps after the initial partition of the timeseries table,
* we are starting from the first partition instead of the last. * we are starting from the first partition instead of the last. If start_from
* is given we've already used that to initiate ranges.
* *
* Also note that we must have either start_from or an initial partition for the timeseries * Note that we must have either start_from or an initial partition for the timeseries
* table, as we call that function while creating timeseries table first. * table, as we call that function while creating timeseries table first.
*/ */
IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN IF current_range_from_value IS NULL AND current_range_to_value IS NULL THEN
@ -94,23 +95,36 @@ BEGIN
ORDER BY from_value::timestamptz; ORDER BY from_value::timestamptz;
END IF; END IF;
WHILE current_range_to_value < to_date LOOP /*
current_range_from_value := current_range_to_value; * Get datatype here to generate range values in the right data format
current_range_to_value := current_range_to_value + table_partition_interval; * Since we already check that timeseries tables have single column to partition the table
* we can directly get the 0th element of the partattrs column
*/
SELECT atttypid::regtype::text INTO table_partition_column_type_name
FROM pg_attribute
WHERE attrelid = table_name::oid
AND attnum = (select partattrs[0] from pg_partitioned_table where partrelid = table_name::oid);
-- Check whether partition with given range has already been created WHILE current_range_from_value < to_date LOOP
-- Since partition interval can be given as, we are converting all variables to timestamptz to make sure /*
-- that we are comparing same type of parameters * Check whether partition with given range has already been created
* Since partition interval can be given as, we are converting all variables to timestamptz to make sure
* that we are comparing same type of parameters
*/
PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz; PERFORM * FROM pg_catalog.time_partitions WHERE from_value::timestamptz = current_range_from_value::timestamptz AND to_value::timestamptz = current_range_to_value::timestamptz;
IF found THEN IF found THEN
current_range_from_value := current_range_to_value;
current_range_to_value := current_range_to_value + table_partition_interval;
CONTINUE; CONTINUE;
END IF; END IF;
-- Check whether any other partition covers from_value or to_value /*
-- That means some partitions have been created manually and we must error out. * Check whether any other partition covers from_value or to_value
* That means some partitions have been created manually and we must error out.
*/
PERFORM * FROM pg_catalog.time_partitions PERFORM * FROM pg_catalog.time_partitions
WHERE (current_range_from_value::timestamptz >= from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR WHERE (current_range_from_value::timestamptz > from_value::timestamptz AND current_range_from_value < to_value::timestamptz) OR
(current_range_to_value::timestamptz >= from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz); (current_range_to_value::timestamptz > from_value::timestamptz AND current_range_to_value::timestamptz < to_value::timestamptz);
IF found THEN IF found THEN
RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name; RAISE 'For the table % manual partition(s) has been created, Please remove them to continue using that table as timeseries table', table_name;
END IF; END IF;
@ -124,6 +138,9 @@ BEGIN
ELSE ELSE
RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name; RAISE 'type of the partition column of the table % must be date, timestamp or timestamptz', table_name;
END IF; END IF;
current_range_from_value := current_range_to_value;
current_range_to_value := current_range_to_value + table_partition_interval;
END LOOP; END LOOP;
RETURN; RETURN;

View File

@ -16,13 +16,14 @@
#include "server/datatype/timestamp.h" #include "server/datatype/timestamp.h"
#include "server/partitioning/partdefs.h" #include "server/partitioning/partdefs.h"
#define Natts_citus_timeseries_tables 6 #define Natts_citus_timeseries_tables 7
#define Anum_citus_timeseries_table_relation_id 1 #define Anum_citus_timeseries_table_relation_id 1
#define Anum_citus_timeseries_table_partition_interval 2 #define Anum_citus_timeseries_table_partition_interval 2
#define Anum_citus_timeseries_table_premake_interval_count 3 #define Anum_citus_timeseries_table_postmake_interval_count 3
#define Anum_citus_timeseries_table_postmake_interval_count 4 #define Anum_citus_timeseries_table_premake_interval_count 4
#define Anum_citus_timeseries_table_compression_threshold 5 #define Anum_citus_timeseries_table_start_from 5
#define Anum_citus_timeseries_table_retention_threshold 6 #define Anum_citus_timeseries_table_compression_threshold 6
#define Anum_citus_timeseries_table_retention_threshold 7
#define INTERVAL_TO_SEC(ivp) \ #define INTERVAL_TO_SEC(ivp) \
(((double) (ivp)->time) / ((double) USECS_PER_SEC) + \ (((double) (ivp)->time) / ((double) USECS_PER_SEC) + \