From 787f76a95247154a654f1c3707ffcccf9d1c46ce Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Fri, 20 Aug 2021 12:11:16 +0300 Subject: [PATCH] citus_indent --- .../timeseries/create_timeseries_table.c | 106 ++++++++++++------ src/backend/timeseries/timeseries_utils.c | 40 +++++-- src/include/timeseries/timeseries_utils.h | 7 +- 3 files changed, 103 insertions(+), 50 deletions(-) diff --git a/src/backend/timeseries/create_timeseries_table.c b/src/backend/timeseries/create_timeseries_table.c index f0183209f..70b10cd80 100644 --- a/src/backend/timeseries/create_timeseries_table.c +++ b/src/backend/timeseries/create_timeseries_table.c @@ -37,32 +37,37 @@ PG_FUNCTION_INFO_V1(create_timeseries_table); static void InitiateTimeseriesTablePartitions(Oid relationId); -static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, - int postMakePartitionCount, Interval *compressionThresholdInterval, +static void InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, + int preMakePartitionCount, + int postMakePartitionCount, + Interval *compressionThresholdInterval, Interval *retentionThresholdInterval); -static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, - Interval *compresstionThresholdInterval, - Interval *retentionThresholdInterval); +static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, + Interval *partitionInterval, + Interval * + compresstionThresholdInterval, + Interval * + retentionThresholdInterval); /* - * create_timeseries_table gets a table name, partition interval + * create_timeseries_table gets a table name, partition interval * optional pre and post make partition counts, compression and retention threshold * then it creates a timeseries table. */ Datum create_timeseries_table(PG_FUNCTION_ARGS) { - CheckCitusVersion(ERROR); + CheckCitusVersion(ERROR); - if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) { PG_RETURN_VOID(); } Oid relationId = PG_GETARG_OID(0); Interval *partitionInterval = PG_GETARG_INTERVAL_P(1); - int preMakePartitionCount = PG_GETARG_INT32(2); - int postMakePartitionCount = PG_GETARG_INT32(3); + int preMakePartitionCount = PG_GETARG_INT32(2); + int postMakePartitionCount = PG_GETARG_INT32(3); Interval *compressionThresholdInterval = NULL; Interval *retentionThresholdInterval = NULL; @@ -76,33 +81,42 @@ create_timeseries_table(PG_FUNCTION_ARGS) retentionThresholdInterval = DatumGetIntervalP(PG_GETARG_DATUM(5)); } - ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, compressionThresholdInterval, retentionThresholdInterval); + ErrorIfNotSuitableToConvertTimeseriesTable(relationId, partitionInterval, + compressionThresholdInterval, + retentionThresholdInterval); - InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount, - postMakePartitionCount, compressionThresholdInterval, retentionThresholdInterval); + InsertIntoCitusTimeseriesTables(relationId, partitionInterval, preMakePartitionCount, + postMakePartitionCount, compressionThresholdInterval, + retentionThresholdInterval); InitiateTimeseriesTablePartitions(relationId); PG_RETURN_VOID(); } + /* * Check whether the given table and intervals are suitable to convert a table to timeseries table, * if not error out. */ -static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) +static void +ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionInterval, + Interval *compressionThresholdInterval, + Interval *retentionThresholdInterval) { Relation pgPartitionedTableRelation; PartitionKey partitionKey; - if(!TableEmpty(relationId)) + if (!TableEmpty(relationId)) { - ereport(ERROR, (errmsg("non-empty tables can not be converted to timeseries table"))); + ereport(ERROR, (errmsg("non-empty tables can not be converted to " + "timeseries table"))); } - if(!PartitionedTable(relationId)) + if (!PartitionedTable(relationId)) { - ereport(ERROR, (errmsg("non-partitioned tables can not be converted to timeseries table"))); + ereport(ERROR, (errmsg("non-partitioned tables can not be converted " + "to timeseries table"))); } pgPartitionedTableRelation = table_open(relationId, AccessShareLock); @@ -110,37 +124,49 @@ static void ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval if (partitionKey->strategy != PARTITION_STRATEGY_RANGE) { - ereport(ERROR, (errmsg("table must be partitioned by range to convert it to timeseries table"))); + ereport(ERROR, (errmsg("table must be partitioned by range to convert " + "it to timeseries table"))); } if (partitionKey->partnatts != 1) { - ereport(ERROR, (errmsg("table must be partitioned by single column to convert it to timeseries table"))); + ereport(ERROR, (errmsg("table must be partitioned by single column to " + "convert it to timeseries table"))); } - if (!CheckIntervalAlignmentWithThresholds(partitionInterval, compressionThresholdInterval, retentionThresholdInterval)) + if (!CheckIntervalAlignmentWithThresholds(partitionInterval, + compressionThresholdInterval, + retentionThresholdInterval)) { - ereport(ERROR, (errmsg("retention threshold must be greater than compression threshold and compresstion threshold must be greater than partition interval"))); + ereport(ERROR, (errmsg("retention threshold must be greater than " + "compression threshold and compression threshold " + "must be greater than partition interval"))); } if (!CheckIntervalAlignnmentWithPartitionKey(partitionKey, partitionInterval)) { - ereport(ERROR, (errmsg("partition interval for table partitioned on date must be multiple days"))); + ereport(ERROR, (errmsg("partition interval for table partitioned on " + "date must be multiple days"))); } - if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, compressionThresholdInterval)) + if (compressionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey( + partitionKey, compressionThresholdInterval)) { - ereport(ERROR, (errmsg("compression threshold interval for table partitioned on date must be multiple days"))); + ereport(ERROR, (errmsg("compression threshold interval for table " + "partitioned on date must be multiple days"))); } - if (retentionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey(partitionKey, retentionThresholdInterval)) + if (retentionThresholdInterval != NULL && !CheckIntervalAlignnmentWithPartitionKey( + partitionKey, retentionThresholdInterval)) { - ereport(ERROR, (errmsg("retention threshold interval for table partitioned on date must be multiple days"))); + ereport(ERROR, (errmsg("retention threshold interval for table " + "partitioned on date must be multiple days"))); } table_close(pgPartitionedTableRelation, NoLock); } + /* * Create the initial pre and post make partitions for the given relation id * by getting the related information from citus_timeseries_tables and utilizing @@ -151,12 +177,15 @@ InitiateTimeseriesTablePartitions(Oid relationId) { bool readOnly = false; StringInfo initiateTimeseriesPartitionsCommand = makeStringInfo(); - appendStringInfo(initiateTimeseriesPartitionsCommand, "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", relationId); + appendStringInfo(initiateTimeseriesPartitionsCommand, + "SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;", + relationId); int spiConnectionResult = SPI_connect(); if (spiConnectionResult != SPI_OK_CONNECT) { - ereport(WARNING, (errmsg("could not connect to SPI manager to initiate timeseries table partitions"))); + ereport(WARNING, (errmsg("could not connect to SPI manager to " + "initiate timeseries table partitions"))); SPI_finish(); } @@ -164,26 +193,30 @@ InitiateTimeseriesTablePartitions(Oid relationId) SPI_finish(); } + /* * Add tuples for the given table to the citus_timeseries_tables using given params */ static void -InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int preMakePartitionCount, int postMakePartitionCount, - Interval *compressionThresholdInterval, Interval *retentionThresholdInterval) +InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int + preMakePartitionCount, int postMakePartitionCount, + Interval *compressionThresholdInterval, + Interval *retentionThresholdInterval) { Datum newValues[Natts_citus_timeseries_tables]; bool isNulls[Natts_citus_timeseries_tables]; - Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), RowExclusiveLock); + Relation citusTimeseriesTable = table_open(CitusTimeseriesTablesRelationId(), + RowExclusiveLock); memset(newValues, 0, sizeof(newValues)); memset(isNulls, false, sizeof(isNulls)); newValues[0] = ObjectIdGetDatum(relationId); newValues[1] = IntervalPGetDatum(partitionInterval); - newValues[2] = Int32GetDatum(preMakePartitionCount); - newValues[3] = Int32GetDatum(postMakePartitionCount); - + newValues[2] = Int32GetDatum(preMakePartitionCount); + newValues[3] = Int32GetDatum(postMakePartitionCount); + if (compressionThresholdInterval != NULL) { newValues[4] = IntervalPGetDatum(compressionThresholdInterval); @@ -202,7 +235,8 @@ InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval, int isNulls[5] = true; } - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), newValues, isNulls); + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(citusTimeseriesTable), + newValues, isNulls); CatalogTupleInsert(citusTimeseriesTable, newTuple); table_close(citusTimeseriesTable, NoLock); diff --git a/src/backend/timeseries/timeseries_utils.c b/src/backend/timeseries/timeseries_utils.c index b152457e6..a90080732 100644 --- a/src/backend/timeseries/timeseries_utils.c +++ b/src/backend/timeseries/timeseries_utils.c @@ -33,15 +33,18 @@ Oid CitusTimeseriesTablesRelationId() { - Oid relationId = get_relname_relid("citus_timeseries_tables", TimeseriesNamespaceId()); + Oid relationId = get_relname_relid("citus_timeseries_tables", + TimeseriesNamespaceId()); if (relationId == InvalidOid) { - ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables, called too early?"))); + ereport(ERROR, (errmsg( + "cache lookup failed for citus_timeseries_tables, called too early?"))); } return relationId; } + /* * TimeseriesNamespaceId returns namespace id of the schema we store timeseries * related metadata tables. @@ -52,25 +55,38 @@ TimeseriesNamespaceId() return get_namespace_oid("citus_timeseries", false); } + /* * Compare partition interval, compression threshold and retenetion threshold. Note that * compression threshold or retention threshold can be null. */ bool -CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold) +CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, + Interval *compressionThreshold, + Interval *retentionThreshold) { - bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : INTERVAL_TO_SEC(compressionThreshold) > INTERVAL_TO_SEC(partitionInterval); - bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(partitionInterval); - bool retentionGreaterThanCompression = compressionThreshold == NULL || retentionThreshold == NULL ? true : INTERVAL_TO_SEC(retentionThreshold) > INTERVAL_TO_SEC(compressionThreshold); + bool compressionGreaterThanInterval = compressionThreshold == NULL ? true : + INTERVAL_TO_SEC(compressionThreshold) > + INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanInterval = retentionThreshold == NULL ? true : + INTERVAL_TO_SEC(retentionThreshold) > + INTERVAL_TO_SEC(partitionInterval); + bool retentionGreaterThanCompression = compressionThreshold == NULL || + retentionThreshold == NULL ? true : + INTERVAL_TO_SEC(retentionThreshold) > + INTERVAL_TO_SEC(compressionThreshold); - return compressionGreaterThanInterval && retentionGreaterThanInterval && retentionGreaterThanCompression; + return compressionGreaterThanInterval && retentionGreaterThanInterval && + retentionGreaterThanCompression; } + /* * Check whether the given partition interval aligns with the partition column of the table. */ bool -CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval) +CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, + Interval *partitionInterval) { Oid partTypeId; HeapTuple typeTuple; @@ -81,18 +97,18 @@ CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *par typeForm = (Form_pg_type) GETSTRUCT(typeTuple); ReleaseSysCache(typeTuple); - if(strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0) + if (strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0) { if (partitionInterval->time == 0) { return true; } } - else if(strncmp(typeForm->typname.data, "timestamp", NAMEDATALEN) == 0 || - strncmp(typeForm->typname.data, "timestamptz", NAMEDATALEN) == 0) + else if (strncmp(typeForm->typname.data, "timestamp", NAMEDATALEN) == 0 || + strncmp(typeForm->typname.data, "timestamptz", NAMEDATALEN) == 0) { return true; } return false; -} \ No newline at end of file +} diff --git a/src/include/timeseries/timeseries_utils.h b/src/include/timeseries/timeseries_utils.h index e04a53dac..812348fa8 100644 --- a/src/include/timeseries/timeseries_utils.h +++ b/src/include/timeseries/timeseries_utils.h @@ -18,7 +18,10 @@ extern Oid CitusTimeseriesTablesRelationId(void); extern Oid TimeseriesNamespaceId(void); -extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, Interval *compressionThreshold, Interval *retentionThreshold); -extern bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, Interval *partitionInterval); +extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval, + Interval *compressionThreshold, + Interval *retentionThreshold); +extern bool CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey, + Interval *partitionInterval); #endif /* TIMESERIES_UTILS_H_ */