mirror of https://github.com/citusdata/citus.git
Introduce drop_timeseries_table UDF to drop metadata
parent
993a2cbce9
commit
4754a86e31
|
@ -16,7 +16,9 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
|
||||||
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
|
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
|
||||||
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
|
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
|
||||||
#include "udfs/citus_internal_update_relation_colocation/10.2-1.sql";
|
#include "udfs/citus_internal_update_relation_colocation/10.2-1.sql";
|
||||||
|
#include "udfs/citus_drop_trigger/10.2-1.sql"
|
||||||
#include "../../timeseries/sql/timeseries--10.1-1--10.2-1.sql"
|
#include "../../timeseries/sql/timeseries--10.1-1--10.2-1.sql"
|
||||||
#include "../../timeseries/sql/udfs/create_timeseries_table/10.2-1.sql"
|
#include "../../timeseries/sql/udfs/create_timeseries_table/10.2-1.sql"
|
||||||
|
#include "../../timeseries/sql/udfs/drop_timeseries_table/10.2-1.sql"
|
||||||
#include "../../timeseries/sql/udfs/create_missing_partitions/10.2-1.sql"
|
#include "../../timeseries/sql/udfs/create_missing_partitions/10.2-1.sql"
|
||||||
#include "../../timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql"
|
#include "../../timeseries/sql/udfs/get_missing_partition_ranges/10.2-1.sql"
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.notify_constraint_dropped()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$notify_constraint_dropped$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
|
||||||
|
RETURNS event_trigger
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
SET search_path = pg_catalog
|
||||||
|
AS $cdbdt$
|
||||||
|
DECLARE
|
||||||
|
constraint_event_count INTEGER;
|
||||||
|
v_obj record;
|
||||||
|
sequence_names text[] := '{}';
|
||||||
|
table_colocation_id integer;
|
||||||
|
propagate_drop boolean := false;
|
||||||
|
BEGIN
|
||||||
|
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
|
||||||
|
WHERE object_type IN ('table', 'foreign table')
|
||||||
|
LOOP
|
||||||
|
-- first drop the table and metadata on the workers
|
||||||
|
-- then drop all the shards on the workers
|
||||||
|
-- finally remove the pg_dist_partition entry on the coordinator
|
||||||
|
PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
|
PERFORM citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
|
PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
|
PERFORM drop_timeseries_table(v_obj.objid);
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
|
||||||
|
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
|
||||||
|
LOOP
|
||||||
|
PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid);
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
SELECT COUNT(*) INTO constraint_event_count
|
||||||
|
FROM pg_event_trigger_dropped_objects()
|
||||||
|
WHERE object_type IN ('table constraint');
|
||||||
|
|
||||||
|
IF constraint_event_count > 0
|
||||||
|
THEN
|
||||||
|
-- Tell utility hook that a table constraint is dropped so we might
|
||||||
|
-- need to undistribute some of the citus local tables that are not
|
||||||
|
-- connected to any reference tables.
|
||||||
|
PERFORM notify_constraint_dropped();
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$cdbdt$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_drop_trigger()
|
||||||
|
IS 'perform checks and actions at the end of DROP actions';
|
|
@ -24,6 +24,7 @@ BEGIN
|
||||||
PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
PERFORM citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
PERFORM citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
|
||||||
|
PERFORM drop_timeseries_table(v_obj.objid);
|
||||||
END LOOP;
|
END LOOP;
|
||||||
|
|
||||||
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
|
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
|
||||||
|
|
|
@ -32,8 +32,6 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "timeseries/timeseries_utils.h"
|
#include "timeseries/timeseries_utils.h"
|
||||||
|
|
||||||
#define Natts_citus_timeseries_tables 6
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(create_timeseries_table);
|
PG_FUNCTION_INFO_V1(create_timeseries_table);
|
||||||
|
|
||||||
static void InitiateTimeseriesTablePartitions(Oid relationId);
|
static void InitiateTimeseriesTablePartitions(Oid relationId);
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* drop_timeseries_table.c
|
||||||
|
* Routines related to the drop of timeseries tables.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "access/genam.h"
|
||||||
|
#include "access/skey.h"
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "catalog/indexing.h"
|
||||||
|
#include "storage/lockdefs.h"
|
||||||
|
#include "utils/elog.h"
|
||||||
|
#include "utils/relcache.h"
|
||||||
|
#include "utils/fmgroids.h"
|
||||||
|
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "timeseries/timeseries_utils.h"
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(drop_timeseries_table);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* drop_timeseries_table gets the table oid, then it drops
|
||||||
|
* all the metadata related to it. Note that this function doesn't
|
||||||
|
* drop any partitions or any data of the given table.
|
||||||
|
*
|
||||||
|
* TODO: Add unscheduling for automated jobs as well.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
drop_timeseries_table(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
if (PG_ARGISNULL(0))
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
ScanKeyData relIdKey[1];
|
||||||
|
Relation timeseriesRelation = table_open(CitusTimeseriesTablesRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&relIdKey[0],
|
||||||
|
Anum_citus_timeseries_table_relation_id,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ,
|
||||||
|
ObjectIdGetDatum(relationId));
|
||||||
|
|
||||||
|
SysScanDesc timeseriesRelScan = systable_beginscan(timeseriesRelation, InvalidOid, false, NULL, 1, relIdKey);
|
||||||
|
HeapTuple timeseriesTuple = systable_getnext(timeseriesRelScan);
|
||||||
|
|
||||||
|
if (HeapTupleIsValid(timeseriesTuple))
|
||||||
|
{
|
||||||
|
CatalogTupleDelete(timeseriesRelation, ×eriesTuple->t_self);
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(timeseriesRelScan);
|
||||||
|
table_close(timeseriesRelation, NoLock);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.drop_timeseries_table(
|
||||||
|
logicalrelid regclass)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', 'drop_timeseries_table';
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.drop_timeseries_table(
|
||||||
|
logicalrelid regclass)
|
||||||
|
IS 'drops a citus timeseries table by removing metadata';
|
||||||
|
-- TODO: Update comment for unscheduling
|
|
@ -0,0 +1,10 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.drop_timeseries_table(
|
||||||
|
logicalrelid regclass)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', 'drop_timeseries_table';
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.drop_timeseries_table(
|
||||||
|
logicalrelid regclass)
|
||||||
|
IS 'drops a citus timeseries table by removing metadata';
|
||||||
|
-- TODO: Update comment for unscheduling
|
|
@ -22,11 +22,6 @@
|
||||||
|
|
||||||
#include "timeseries/timeseries_utils.h"
|
#include "timeseries/timeseries_utils.h"
|
||||||
|
|
||||||
#define INTERVAL_TO_SEC(ivp) \
|
|
||||||
(((double) (ivp)->time) / ((double) USECS_PER_SEC) + \
|
|
||||||
(ivp)->day * (24.0 * SECS_PER_HOUR) + \
|
|
||||||
(ivp)->month * (30.0 * SECS_PER_DAY))
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the relation id for citus_timeseries_tables metadata table
|
* Get the relation id for citus_timeseries_tables metadata table
|
||||||
*/
|
*/
|
||||||
|
@ -37,8 +32,8 @@ CitusTimeseriesTablesRelationId()
|
||||||
TimeseriesNamespaceId());
|
TimeseriesNamespaceId());
|
||||||
if (relationId == InvalidOid)
|
if (relationId == InvalidOid)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg(
|
ereport(ERROR, (errmsg("cache lookup failed for citus_timeseries_tables,"
|
||||||
"cache lookup failed for citus_timeseries_tables, called too early?")));
|
"called too early?")));
|
||||||
}
|
}
|
||||||
|
|
||||||
return relationId;
|
return relationId;
|
||||||
|
|
|
@ -16,6 +16,19 @@
|
||||||
#include "server/datatype/timestamp.h"
|
#include "server/datatype/timestamp.h"
|
||||||
#include "server/partitioning/partdefs.h"
|
#include "server/partitioning/partdefs.h"
|
||||||
|
|
||||||
|
#define Natts_citus_timeseries_tables 6
|
||||||
|
#define Anum_citus_timeseries_table_relation_id 1
|
||||||
|
#define Anum_citus_timeseries_table_partition_interval 2
|
||||||
|
#define Anum_citus_timeseries_table_premake_interval_count 3
|
||||||
|
#define Anum_citus_timeseries_table_postmake_interval_count 4
|
||||||
|
#define Anum_citus_timeseries_table_compression_threshold 5
|
||||||
|
#define Anum_citus_timeseries_table_retention_threshold 6
|
||||||
|
|
||||||
|
#define INTERVAL_TO_SEC(ivp) \
|
||||||
|
(((double) (ivp)->time) / ((double) USECS_PER_SEC) + \
|
||||||
|
(ivp)->day * (24.0 * SECS_PER_HOUR) + \
|
||||||
|
(ivp)->month * (30.0 * SECS_PER_DAY))
|
||||||
|
|
||||||
extern Oid CitusTimeseriesTablesRelationId(void);
|
extern Oid CitusTimeseriesTablesRelationId(void);
|
||||||
extern Oid TimeseriesNamespaceId(void);
|
extern Oid TimeseriesNamespaceId(void);
|
||||||
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
|
extern bool CheckIntervalAlignmentWithThresholds(Interval *partitionInterval,
|
||||||
|
|
Loading…
Reference in New Issue