mirror of https://github.com/citusdata/citus.git
Fix tests and adress reviews
parent
e4fbcfe3cf
commit
22dcc251c3
|
@ -192,7 +192,7 @@ ErrorIfNotSuitableToConvertTimeseriesTable(Oid relationId, Interval *partitionIn
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create the initial pre and post make partitions for the given relation id
|
* Create the initial pre and post make partitions for the given relation id
|
||||||
* by getting the related information from citus_timeseries_tables and utilizing
|
* by getting the related information from timeseries.tables and utilizing
|
||||||
* create_missing_partitions
|
* create_missing_partitions
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
|
@ -209,13 +209,25 @@ InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom)
|
||||||
if (useStartFrom)
|
if (useStartFrom)
|
||||||
{
|
{
|
||||||
appendStringInfo(initiateTimeseriesPartitionsCommand,
|
appendStringInfo(initiateTimeseriesPartitionsCommand,
|
||||||
"SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, startfrom) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;",
|
"SELECT "
|
||||||
|
"pg_catalog.create_missing_partitions("
|
||||||
|
"logicalrelid,"
|
||||||
|
"now() + partitioninterval * postmakeintervalcount,"
|
||||||
|
"startfrom) "
|
||||||
|
"FROM timeseries.tables "
|
||||||
|
"WHERE logicalrelid = %d;",
|
||||||
relationId);
|
relationId);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
appendStringInfo(initiateTimeseriesPartitionsCommand,
|
appendStringInfo(initiateTimeseriesPartitionsCommand,
|
||||||
"SELECT create_missing_partitions(logicalrelid, now() + partitioninterval * postmakeintervalcount, now() - partitioninterval * premakeintervalcount) from citus_timeseries.citus_timeseries_tables WHERE logicalrelid = %d;",
|
"SELECT "
|
||||||
|
"pg_catalog.create_missing_partitions("
|
||||||
|
"logicalrelid,"
|
||||||
|
"now() + partitioninterval * postmakeintervalcount,"
|
||||||
|
"now() - partitioninterval * premakeintervalcount) "
|
||||||
|
"FROM timeseries.tables "
|
||||||
|
"WHERE logicalrelid = %d;",
|
||||||
relationId);
|
relationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,7 +245,7 @@ InitiateTimeseriesTablePartitions(Oid relationId, bool useStartFrom)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Add tuples for the given table to the citus_timeseries_tables using given params
|
* Add tuples for the given table to the timeseries.tables using given params
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval,
|
InsertIntoCitusTimeseriesTables(Oid relationId, Interval *partitionInterval,
|
||||||
|
|
|
@ -46,15 +46,16 @@ drop_timeseries_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
ScanKeyData relIdKey[1];
|
ScanKeyData relIdKey[1];
|
||||||
Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(),
|
Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(),
|
||||||
AccessShareLock);
|
RowExclusiveLock);
|
||||||
|
Oid pkeyOid = CitusTimeseriesTablesPKeyIndexRelationId();
|
||||||
|
|
||||||
ScanKeyInit(&relIdKey[0],
|
ScanKeyInit(&relIdKey[0],
|
||||||
Anum_citus_timeseries_table_relation_id,
|
Anum_citus_timeseries_table_relation_id,
|
||||||
BTEqualStrategyNumber, F_OIDEQ,
|
BTEqualStrategyNumber, F_OIDEQ,
|
||||||
ObjectIdGetDatum(relationId));
|
ObjectIdGetDatum(relationId));
|
||||||
|
|
||||||
SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid,
|
SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, pkeyOid,
|
||||||
false, NULL, 1, relIdKey);
|
true, NULL, 1, relIdKey);
|
||||||
HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan);
|
HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan);
|
||||||
|
|
||||||
if (HeapTupleIsValid(timeseriesTuple))
|
if (HeapTupleIsValid(timeseriesTuple))
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
/* timeseries--10.1-1--10.2-1.sql */
|
/* timeseries--10.1-1--10.2-1.sql */
|
||||||
|
|
||||||
CREATE SCHEMA citus_timeseries;
|
CREATE SCHEMA timeseries;
|
||||||
SET search_path TO citus_timeseries;
|
SET search_path TO timeseries;
|
||||||
|
|
||||||
CREATE TABLE citus_timeseries_tables (
|
CREATE TABLE tables (
|
||||||
logicalrelid regclass NOT NULL PRIMARY KEY,
|
logicalrelid regclass NOT NULL PRIMARY KEY,
|
||||||
partitioninterval INTERVAL NOT NULL,
|
partitioninterval INTERVAL NOT NULL,
|
||||||
postmakeintervalcount INT NOT NULL,
|
postmakeintervalcount INT NOT NULL,
|
||||||
|
@ -13,10 +13,10 @@ CREATE TABLE citus_timeseries_tables (
|
||||||
retentionthreshold INTERVAL
|
retentionthreshold INTERVAL
|
||||||
);
|
);
|
||||||
|
|
||||||
COMMENT ON TABLE citus_timeseries_tables IS 'Keeps interval and threshold informations for timeseries tables';
|
COMMENT ON TABLE tables IS 'Keeps interval and threshold informations for timeseries tables';
|
||||||
|
|
||||||
-- grant read access for timeseries metadata tables to unprivileged user
|
-- grant read access for timeseries metadata tables to unprivileged user
|
||||||
GRANT USAGE ON SCHEMA citus_timeseries TO PUBLIC;
|
GRANT USAGE ON SCHEMA timeseries TO PUBLIC;
|
||||||
GRANT SELECT ON ALL tables IN SCHEMA citus_timeseries TO PUBLIC;
|
GRANT SELECT ON ALL tables IN SCHEMA timeseries TO PUBLIC;
|
||||||
|
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
|
|
|
@ -24,7 +24,7 @@ BEGIN
|
||||||
|
|
||||||
SELECT partitioninterval
|
SELECT partitioninterval
|
||||||
INTO table_partition_interval
|
INTO table_partition_interval
|
||||||
FROM citus_timeseries.citus_timeseries_tables
|
FROM timeseries.tables
|
||||||
WHERE logicalrelid = table_name;
|
WHERE logicalrelid = table_name;
|
||||||
|
|
||||||
IF NOT found THEN
|
IF NOT found THEN
|
||||||
|
|
|
@ -24,7 +24,7 @@ BEGIN
|
||||||
|
|
||||||
SELECT partitioninterval
|
SELECT partitioninterval
|
||||||
INTO table_partition_interval
|
INTO table_partition_interval
|
||||||
FROM citus_timeseries.citus_timeseries_tables
|
FROM timeseries.tables
|
||||||
WHERE logicalrelid = table_name;
|
WHERE logicalrelid = table_name;
|
||||||
|
|
||||||
IF NOT found THEN
|
IF NOT found THEN
|
||||||
|
|
|
@ -23,16 +23,16 @@
|
||||||
#include "timeseries/timeseries_utils.h"
|
#include "timeseries/timeseries_utils.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the relation id for citus_timeseries_tables metadata table
|
* Get the relation id for timeseries.tables metadata table
|
||||||
*/
|
*/
|
||||||
Oid
|
Oid
|
||||||
CitusTimeseriesTablesRelationId()
|
CitusTimeseriesTablesRelationId()
|
||||||
{
|
{
|
||||||
Oid relationId = get_relname_relid("citus_timeseries_tables",
|
Oid relationId = get_relname_relid("tables",
|
||||||
TimeseriesNamespaceId());
|
TimeseriesNamespaceId());
|
||||||
if (relationId == InvalidOid)
|
if (relationId == InvalidOid)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables,"
|
ereport(ERROR, (errmsg("cache lookup failed for timeseries tables,"
|
||||||
"called too early?")));
|
"called too early?")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +40,16 @@ CitusTimeseriesTablesRelationId()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusTimeseriesTablesPKeyIndexRelationId returns relation id of timeseries.tables_pkey.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
CitusTimeseriesTablesPKeyIndexRelationId()
|
||||||
|
{
|
||||||
|
return get_relname_relid("tables_pkey", TimeseriesNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TimeseriesNamespaceId returns namespace id of the schema we store timeseries
|
* TimeseriesNamespaceId returns namespace id of the schema we store timeseries
|
||||||
* related metadata tables.
|
* related metadata tables.
|
||||||
|
@ -47,7 +57,7 @@ CitusTimeseriesTablesRelationId()
|
||||||
Oid
|
Oid
|
||||||
TimeseriesNamespaceId()
|
TimeseriesNamespaceId()
|
||||||
{
|
{
|
||||||
return get_namespace_oid("citus_timeseries", false);
|
return get_namespace_oid("timeseries", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -83,13 +93,9 @@ bool
|
||||||
CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey,
|
CheckIntervalAlignnmentWithPartitionKey(PartitionKey partitionKey,
|
||||||
Interval *partitionInterval)
|
Interval *partitionInterval)
|
||||||
{
|
{
|
||||||
Oid partTypeId;
|
Oid partTypeId = partitionKey->parttypid[0];
|
||||||
HeapTuple typeTuple;
|
HeapTuple typeTuple = SearchSysCache1(TYPEOID, partTypeId);
|
||||||
Form_pg_type typeForm;
|
Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
|
||||||
|
|
||||||
partTypeId = partitionKey->parttypid[0];
|
|
||||||
typeTuple = SearchSysCache1(TYPEOID, partTypeId);
|
|
||||||
typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
|
|
||||||
ReleaseSysCache(typeTuple);
|
ReleaseSysCache(typeTuple);
|
||||||
|
|
||||||
if (strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0)
|
if (strncmp(typeForm->typname.data, "date", NAMEDATALEN) == 0)
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
(ivp)->month * (30.0 * SECS_PER_DAY))
|
(ivp)->month * (30.0 * SECS_PER_DAY))
|
||||||
|
|
||||||
extern Oid CitusTimeseriesTablesRelationId(void);
|
extern Oid CitusTimeseriesTablesRelationId(void);
|
||||||
|
extern Oid CitusTimeseriesTablesPKeyIndexRelationId(void);
|
||||||
extern Oid TimeseriesNamespaceId(void);
|
extern Oid TimeseriesNamespaceId(void);
|
||||||
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
|
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
|
||||||
Interval *compressionThreshold,
|
Interval *compressionThreshold,
|
||||||
|
|
|
@ -380,14 +380,14 @@ SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
drop_check_test_partitioned_table | @ 2 hours | 7 | 7 | | |
|
drop_check_test_partitioned_table | @ 2 hours | 7 | 7 | | |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE drop_check_test_partitioned_table;
|
DROP TABLE drop_check_test_partitioned_table;
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
@ -403,14 +403,14 @@ SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
drop_check_test_partitioned_table | @ 2 hours | 7 | 7 | | |
|
drop_check_test_partitioned_table | @ 2 hours | 7 | 7 | | |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE drop_check_test_partitioned_table;
|
DROP TABLE drop_check_test_partitioned_table;
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
logicalrelid | partitioninterval | postmakeintervalcount | premakeintervalcount | startfrom | compressionthreshold | retentionthreshold
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
|
@ -230,7 +230,7 @@ ORDER BY 1;
|
||||||
sequence pg_dist_placement_placementid_seq
|
sequence pg_dist_placement_placementid_seq
|
||||||
sequence pg_dist_shardid_seq
|
sequence pg_dist_shardid_seq
|
||||||
table citus.pg_dist_object
|
table citus.pg_dist_object
|
||||||
table citus_timeseries.citus_timeseries_tables
|
table timeseries.tables
|
||||||
table columnar.chunk
|
table columnar.chunk
|
||||||
table columnar.chunk_group
|
table columnar.chunk_group
|
||||||
table columnar.options
|
table columnar.options
|
||||||
|
|
|
@ -224,9 +224,9 @@ CREATE TABLE drop_check_test_partitioned_table(
|
||||||
measure_data integer) PARTITION BY RANGE(eventdatetime);
|
measure_data integer) PARTITION BY RANGE(eventdatetime);
|
||||||
|
|
||||||
SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2 hours');
|
SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2 hours');
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
DROP TABLE drop_check_test_partitioned_table;
|
DROP TABLE drop_check_test_partitioned_table;
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE TABLE drop_check_test_partitioned_table(
|
CREATE TABLE drop_check_test_partitioned_table(
|
||||||
|
@ -234,7 +234,7 @@ BEGIN;
|
||||||
eventdatetime timestamp with time zone,
|
eventdatetime timestamp with time zone,
|
||||||
measure_data integer) PARTITION BY RANGE(eventdatetime);
|
measure_data integer) PARTITION BY RANGE(eventdatetime);
|
||||||
SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2 hours');
|
SELECT create_timeseries_table('drop_check_test_partitioned_table', INTERVAL '2 hours');
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
DROP TABLE drop_check_test_partitioned_table;
|
DROP TABLE drop_check_test_partitioned_table;
|
||||||
SELECT * FROM citus_timeseries.citus_timeseries_tables;
|
SELECT * FROM timeseries.tables;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
|
@ -80,7 +80,7 @@ ROLLBACK;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT create_timeseries_table('tstz_partitioned_table', INTERVAL '6 hours');
|
SELECT create_timeseries_table('tstz_partitioned_table', INTERVAL '6 hours');
|
||||||
SELECT
|
SELECT
|
||||||
date_trunc('hour', now()) - range_from_value::timestamp with time zone as from_diff,
|
date_trunc('hour', now()) - range_from_value::timestamp with time zone as from_diff,
|
||||||
date_trunc('hour', now()) - range_to_value::timestamp with time zone as to_diff
|
date_trunc('hour', now()) - range_to_value::timestamp with time zone as to_diff
|
||||||
FROM get_missing_partition_ranges('tstz_partitioned_table', now() + INTERVAL '1 day', now() - INTERVAL '1 day')
|
FROM get_missing_partition_ranges('tstz_partitioned_table', now() + INTERVAL '1 day', now() - INTERVAL '1 day')
|
||||||
|
|
Loading…
Reference in New Issue