Clean-up resources on drop

merge-cstore-pykello
Hadi Moshayedi 2020-09-26 12:39:16 -07:00
parent cf0ba6103e
commit d37c717e14
8 changed files with 138 additions and 50 deletions

View File

@ -16,9 +16,13 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "catalog/objectaccess.h"
#include "catalog/storage.h"
#include "miscadmin.h"
#include "utils/guc.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "cstore.h"
@ -38,6 +42,11 @@ static const struct config_enum_entry cstore_compression_options[] =
{ NULL, 0, false }
};
static object_access_hook_type prevObjectAccess = NULL;
static void ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId,
void *arg);
void
cstore_init()
{
@ -78,6 +87,9 @@ cstore_init()
NULL,
NULL,
NULL);
prevObjectAccess = object_access_hook;
object_access_hook = ObjectAccess;
}
@ -110,3 +122,71 @@ InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions)
{
InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount);
}
/*
* Implements object_access_hook. One of the places this is called is just
* before dropping an object, which allows us to clean-up resources for
* cstore tables while the pg_class record for the table is still there.
*/
static void
ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg)
{
if (prevObjectAccess)
{
prevObjectAccess(access, classId, objectId, subId, arg);
}
/*
* Do nothing if this is not a DROP relation command.
*/
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
{
return;
}
if (IsCStoreFdwTable(objectId))
{
/*
* Drop both metadata and storage. We need to drop storage here since
* we manage relfilenode for FDW tables in the extension.
*/
Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock);
RelationOpenSmgr(rel);
RelationDropStorage(rel);
DeleteTableMetadataRowIfExists(rel->rd_node.relNode);
/* keep the lock since we did physical changes to the relation */
relation_close(rel, NoLock);
}
else
{
Oid relNode = InvalidOid;
Relation rel = try_relation_open(objectId, AccessExclusiveLock);
if (rel == NULL)
{
return;
}
relNode = rel->rd_node.relNode;
if (IsCStoreStorage(relNode))
{
/*
* Drop only metadata for table am cstore tables. Postgres manages
* storage for these tables, so we don't need to drop that.
*/
DeleteTableMetadataRowIfExists(relNode);
/* keep the lock since we did physical changes to the relation */
relation_close(rel, NoLock);
}
else
{
/*
* For non-cstore tables, we do nothing.
* Release the lock since we haven't changed the relation.
*/
relation_close(rel, AccessExclusiveLock);
}
}
}

View File

@ -283,6 +283,7 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
/* cstore_metadata_tables.c */
extern bool IsCStoreStorage(Oid relfilenode);
extern void DeleteTableMetadataRowIfExists(Oid relfilenode);
extern void InitCStoreTableMetadata(Oid relfilenode, int blockRowCount);
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);

View File

@ -126,7 +126,6 @@ static uint64 CopyIntoCStoreTable(const CopyStmt *copyStatement,
const char *queryString);
static uint64 CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString);
static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
static List * DroppedCStoreRelidList(DropStmt *dropStatement);
static List * FindCStoreTables(List *tableList);
static List * OpenRelationsForTruncate(List *cstoreTableList);
static void FdwNewRelFileNode(Relation relation);
@ -315,25 +314,6 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
destReceiver, completionTag);
}
}
else if (nodeTag(parseTree) == T_DropStmt)
{
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
ListCell *lc = NULL;
/* drop smgr storage */
foreach(lc, dropRelids)
{
Oid relid = lfirst_oid(lc);
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
RelationOpenSmgr(relation);
RelationDropStorage(relation);
heap_close(relation, AccessExclusiveLock);
}
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
destReceiver, completionTag);
}
else if (nodeTag(parseTree) == T_TruncateStmt)
{
TruncateStmt *truncateStatement = (TruncateStmt *) parseTree;
@ -723,36 +703,6 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
}
/*
* DropppedCStoreRelidList extracts and returns the list of cstore relids
* from DROP table statement
*/
static List *
DroppedCStoreRelidList(DropStmt *dropStatement)
{
List *droppedCStoreRelidList = NIL;
if (dropStatement->removeType == OBJECT_FOREIGN_TABLE)
{
ListCell *dropObjectCell = NULL;
foreach(dropObjectCell, dropStatement->objects)
{
List *tableNameList = (List *) lfirst(dropObjectCell);
RangeVar *rangeVar = makeRangeVarFromNameList(tableNameList);
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
if (CStoreTable(relationId))
{
droppedCStoreRelidList = lappend_oid(droppedCStoreRelidList,
relationId);
}
}
}
return droppedCStoreRelidList;
}
/* FindCStoreTables returns list of CStore tables from given table list */
static List *
FindCStoreTables(List *tableList)

View File

@ -94,6 +94,17 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
#define Anum_cstore_skipnodes_value_compression_type 12
/*
* IsCStoreStorage returns if relfilenode belongs to a cstore table.
*/
bool
IsCStoreStorage(Oid relfilenode)
{
uint64 blockRowCount = 0;
return ReadCStoreTables(relfilenode, &blockRowCount);
}
/*
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
*/

View File

@ -12,14 +12,29 @@
-- 'postgres' directory is excluded from comparison to have the same result.
-- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
-- DROP cstore_fdw tables
DROP TABLE contestant;
DROP TABLE contestant_compressed;
-- make sure DROP deletes metadata
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
?column?
----------
2
(1 row)
-- Create a cstore_fdw table under a schema and drop it.
CREATE SCHEMA test_schema;
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
DROP SCHEMA test_schema CASCADE;
NOTICE: drop cascades to table test_schema.test_table
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
?column?
----------
1
(1 row)
SELECT current_database() datname \gset
CREATE DATABASE db_to_drop;
\c db_to_drop

View File

@ -12,14 +12,29 @@
-- 'postgres' directory is excluded from comparison to have the same result.
-- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
-- DROP cstore_fdw tables
DROP FOREIGN TABLE contestant;
DROP FOREIGN TABLE contestant_compressed;
-- make sure DROP deletes metadata
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
?column?
----------
2
(1 row)
-- Create a cstore_fdw table under a schema and drop it.
CREATE SCHEMA test_schema;
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
DROP SCHEMA test_schema CASCADE;
NOTICE: drop cascades to foreign table test_schema.test_table
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
?column?
----------
1
(1 row)
SELECT current_database() datname \gset
CREATE DATABASE db_to_drop;
\c db_to_drop

View File

@ -15,14 +15,22 @@
-- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
-- DROP cstore_fdw tables
DROP TABLE contestant;
DROP TABLE contestant_compressed;
-- make sure DROP deletes metadata
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
-- Create a cstore_fdw table under a schema and drop it.
CREATE SCHEMA test_schema;
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
DROP SCHEMA test_schema CASCADE;
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
SELECT current_database() datname \gset

View File

@ -15,14 +15,22 @@
-- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
-- DROP cstore_fdw tables
DROP FOREIGN TABLE contestant;
DROP FOREIGN TABLE contestant_compressed;
-- make sure DROP deletes metadata
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
-- Create a cstore_fdw table under a schema and drop it.
CREATE SCHEMA test_schema;
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset
DROP SCHEMA test_schema CASCADE;
SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables;
SELECT current_database() datname \gset