citus_indent

velioglu/create_timeseries_table
Burak Velioglu 2021-08-20 12:11:16 +03:00
parent ce12ab5570
commit 787f76a952
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
3 changed files with 103 additions and 50 deletions

View File

@ -37,12 +37,17 @@
PG_FUNCTION_INFO_V1(create_timeseries_table);
static void InitiateTimeseriesTablePartitions(Oid relationId);
static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount,
int postMakePartitionCount, Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval);
static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval,
Interval *compresstionThresholdInterval,
static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval,
int preMakePartitionCount,
int postMakePartitionCount,
Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval);
static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId,
Interval *partitionInterval,
Interval *
compresstionThresholdInterval,
Interval *
retentionThresholdInterval);
/*
* create_timeseries_table gets a table name, partition interval
@ -76,33 +81,42 @@ create_timeseries_table(PG_FUNCTION_ARGS)
retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5));
}
ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, compressionThresholdInterval, retentionThresholdInterval);
ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval,
compressionThresholdInterval,
retentionThresholdInterval);
InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount,
postMakePartitionCount, compressionThresholdInterval, retentionThresholdInterval);
postMakePartitionCount, compressionThresholdInterval,
retentionThresholdInterval);
InitiateTimeseriesTablePartitions(relationId);
PG_RETURN_VOID();
}
/*
* Check whether the given table and intervals are suitable to convert a table to timeseries table,
* if not error out.
*/
static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval)
static void
ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval,
Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval)
{
Relation pgPartitionedTableRelation;
PartitionKey partitionKey;
if (!TableEmpty(relationId))
{
ereport(ERROR, (errmsg("non-empty tables can not be converted to timeseries table")));
ereport(ERROR, (errmsg("non-empty tables can not be converted to "
"timeseries table")));
}
if (!PartitionedTable(relationId))
{
ereport(ERROR, (errmsg("non-partitioned tables can not be converted to timeseries table")));
ereport(ERROR, (errmsg("non-partitioned tables can not be converted "
"to timeseries table")));
}
pgPartitionedTableRelation = table_open(relationId, AccessShareLock);
@ -110,37 +124,49 @@ static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval
if (partitionKey->strategy != PARTITION_STRATEGY_RANGE)
{
ereport(ERROR, (errmsg("table must be partitioned by range to convert it to timeseries table")));
ereport(ERROR, (errmsg("table must be partitioned by range to convert "
"it to timeseries table")));
}
if (partitionKey->partnatts != 1)
{
ereport(ERROR, (errmsg("table must be partitioned by single column to convert it to timeseries table")));
ereport(ERROR, (errmsg("table must be partitioned by single column to "
"convert it to timeseries table")));
}
if (!CheckIntervalAlignmentWithThresholds(partitionInterval, compressionThresholdInterval, retentionThresholdInterval))
if (!CheckIntervalAlignmentWithThresholds(partitionInterval,
compressionThresholdInterval,
retentionThresholdInterval))
{
ereport(ERROR, (errmsg("retention threshold must be greater than compression threshold and compresstion threshold must be greater than partition interval")));
ereport(ERROR, (errmsg("retention threshold must be greater than "
"compression threshold and compression threshold "
"must be greater than partition interval")));
}
if (!CheckIntervalAlignnmentWithPartitionKey(partitionKey, partitionInterval))
{
ereport(ERROR, (errmsg("partition interval for table partitioned on date must be multiple days")));
ereport(ERROR, (errmsg("partition interval for table partitioned on "
"date must be multiple days")));
}
if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, compressionThresholdInterval))
if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(
partitionKey, compressionThresholdInterval))
{
ereport(ERROR, (errmsg("compression threshold interval for table partitioned on date must be multiple days")));
ereport(ERROR, (errmsg("compression threshold interval for table "
"partitioned on date must be multiple days")));
}
if (retentionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, retentionThresholdInterval))
if (retentionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(
partitionKey, retentionThresholdInterval))
{
ereport(ERROR, (errmsg("retention threshold interval for table partitioned on date must be multiple days")));
ereport(ERROR, (errmsg("retention threshold interval for table "
"partitioned on date must be multiple days")));
}
table_close(pgPartitionedTableRelation, NoLock);
}
/*
* Create the initial pre and post make partitions for the given relation id
* by getting the related information from citus_timeseries_tables and utilizing
@ -151,12 +177,15 @@ InitiateTimeseriesTablePartitions(Oid relationId)
{
bool readOnly = false;
StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo();
appendStringInfo(initiateTimeseriesPartitionsCommand, "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", relationId);
appendStringInfo(initiateTimeseriesPartitionsCommand,
"SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;",
relationId);
int spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(WARNING, (errmsg("could not connect to SPI manager to initiate timeseries table partitions")));
ereport(WARNING, (errmsg("could not connect to SPI manager to "
"initiate timeseries table partitions")));
SPI_finish();
}
@ -164,17 +193,21 @@ InitiateTimeseriesTablePartitions(Oid relationId)
SPI_finish();
}
/*
* Add tuples for the given table to the citus_timeseries_tables using given params
*/
static void
InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, int postMakePartitionCount,
Interval *compressionThresholdInterval, Interval *retentionThresholdInterval)
InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int
preMakePartitionCount, int postMakePartitionCount,
Interval *compressionThresholdInterval,
Interval *retentionThresholdInterval)
{
Datum newValues[Natts_citus_timeseries_tables];
bool isNulls[Natts_citus_timeseries_tables];
Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), RowExclusiveLock);
Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(),
RowExclusiveLock);
memset(newValues, 0, sizeof(newValues));
memset(isNulls, false, sizeof(isNulls));
@ -202,7 +235,8 @@ InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int
isNulls[5] = true;
}
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), newValues, isNulls);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable),
newValues, isNulls);
CatalogTupleInsert(citusTimeseriesTable, newTuple);
table_close(citusTimeseriesTable, NoLock);

View File

@ -33,15 +33,18 @@
Oid
CitusTimeseriesTablesRelationId()
{
Oid relationId = get_relname_relid("citus_timeseries_tables", TimeseriesNamespaceId());
Oid relationId = get_relname_relid("citus_timeseries_tables",
TimeseriesNamespaceId());
if (relationId == InvalidOid)
{
ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables, called too early?")));
ereport(ERROR, (errmsg(
"cache lookup failed for citus_timeseries_tables, called too early?")));
}
return relationId;
}
/*
* TimeseriesNamespaceId returns namespace id of the schema we store timeseries
* related metadata tables.
@ -52,25 +55,38 @@ TimeseriesNamespaceId()
return get_namespace_oid("citus_timeseries", false);
}
/*
* Compare partition interval, compression threshold and retenetion threshold. Note that
* compression threshold or retention threshold can be null.
*/
bool
CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold)
CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
Interval *compressionThreshold,
Interval *retentionThreshold)
{
bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : INTERVAL_TO_SEC(compressionThreshold) > INTERVAL_TO_SEC(partitionInterval);
bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(partitionInterval);
bool retentionGreaterThanCompression = compressionThreshold == NULL || retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(compressionThreshold);
bool compressionGreaterThanInterval = compressionThreshold == NULL ? true :
INTERVAL_TO_SEC(compressionThreshold) >
INTERVAL_TO_SEC(partitionInterval);
bool retentionGreaterThanInterval = retentionThreshold == NULL ? true :
INTERVAL_TO_SEC(retentionThreshold) >
INTERVAL_TO_SEC(partitionInterval);
bool retentionGreaterThanCompression = compressionThreshold == NULL ||
retentionThreshold == NULL ? true :
INTERVAL_TO_SEC(retentionThreshold) >
INTERVAL_TO_SEC(compressionThreshold);
return compressionGreaterThanInterval && retentionGreaterThanInterval && retentionGreaterThanCompression;
return compressionGreaterThanInterval && retentionGreaterThanInterval &&
retentionGreaterThanCompression;
}
/*
* Check whether the given partition interval aligns with the partition column of the table.
*/
bool
CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval)
CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey,
Interval *partitionInterval)
{
Oid partTypeId;
HeapTuple typeTuple;

View File

@ -18,7 +18,10 @@
extern Oid CitusTimeseriesTablesRelationId(void);
extern Oid TimeseriesNamespaceId(void);
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold);
extern bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval);
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
Interval *compressionThreshold,
Interval *retentionThreshold);
extern bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey,
Interval *partitionInterval);
#endif /* TIMESERIES_UTILS_H_ */