From cf0ba6103ed1d8e91a6976b5c001be5d8a7b6f7e Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Sat, 26 Sep 2020 12:23:14 -0700 Subject: [PATCH 1/4] Associate metadata with rel filenode --- cstore.c | 23 ++------- cstore.h | 26 +++++----- cstore_fdw--1.7.sql | 16 +++--- cstore_fdw.c | 47 +++++++++--------- cstore_metadata_tables.c | 103 +++++++++++++++++++++++++-------------- cstore_reader.c | 11 +++-- cstore_tableam.c | 13 ++--- cstore_writer.c | 16 +++--- 8 files changed, 134 insertions(+), 121 deletions(-) diff --git a/cstore.c b/cstore.c index f5846a029..d6b6751e2 100644 --- a/cstore.c +++ b/cstore.c @@ -102,26 +102,11 @@ ParseCompressionType(const char *compressionTypeString) /* - * InitializeCStoreTableFile creates data and footer file for a cstore table. - * The function assumes data and footer files do not exist, therefore - * it should be called on empty or non-existing table. Notice that the caller - * is expected to acquire AccessExclusiveLock on the relation. + * InitializeCStoreTableFile initializes metadata for the given relation + * file node. */ void -InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *cstoreOptions) +InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions) { - TableWriteState *writeState = NULL; - TupleDesc tupleDescriptor = RelationGetDescr(relation); - - InitCStoreTableMetadata(relationId, cstoreOptions->blockRowCount); - - /* - * Initialize state to write to the cstore file. This creates an - * empty data file and a valid footer file for the table. - */ - writeState = CStoreBeginWrite(relationId, - cstoreOptions->compressionType, - cstoreOptions->stripeRowCount, - cstoreOptions->blockRowCount, tupleDescriptor); - CStoreEndWrite(writeState); + InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount); } diff --git a/cstore.h b/cstore.h index 96fa1ed53..dd5f9e6e1 100644 --- a/cstore.h +++ b/cstore.h @@ -16,7 +16,9 @@ #include "fmgr.h" #include "lib/stringinfo.h" +#include "nodes/parsenodes.h" #include "storage/bufpage.h" +#include "storage/lockdefs.h" #include "utils/relcache.h" /* Defines for valid option names */ @@ -190,8 +192,6 @@ typedef struct StripeBuffers /* TableReadState represents state of a cstore file read operation. */ typedef struct TableReadState { - Oid relationId; - TableMetadata *tableMetadata; StripeMetadata *currentStripeMetadata; TupleDesc tupleDescriptor; @@ -217,7 +217,6 @@ typedef struct TableReadState /* TableWriteState represents state of a cstore file write operation. */ typedef struct TableWriteState { - Oid relationId; TableMetadata *tableMetadata; CompressionType compressionType; TupleDesc tupleDescriptor; @@ -249,11 +248,12 @@ extern int cstore_block_row_count; extern void cstore_init(void); extern CompressionType ParseCompressionType(const char *compressionTypeString); -extern void InitializeCStoreTableFile(Oid relationId, Relation relation, - CStoreOptions *cstoreOptions); +extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions); +extern bool IsCStoreFdwTable(Oid relationId); +extern Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode); /* Function declarations for writing to a cstore file */ -extern TableWriteState * CStoreBeginWrite(Oid relationId, +extern TableWriteState * CStoreBeginWrite(Relation relation, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, @@ -263,7 +263,7 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues, extern void CStoreEndWrite(TableWriteState *state); /* Function declarations for reading from a cstore file */ -extern TableReadState * CStoreBeginRead(Oid relationId, +extern TableReadState * CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *qualConditions); extern bool CStoreReadFinished(TableReadState *state); @@ -283,12 +283,14 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); /* cstore_metadata_tables.c */ -extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); -extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); -extern TableMetadata * ReadTableMetadata(Oid relid); -extern void SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList, +extern void DeleteTableMetadataRowIfExists(Oid relfilenode); +extern void InitCStoreTableMetadata(Oid relfilenode, int blockRowCount); +extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe); +extern TableMetadata * ReadTableMetadata(Oid relfilenode); +extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe, + StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor); -extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe, +extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor, uint32 blockCount); diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index fa8b558e0..b3470b6a5 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -32,17 +32,17 @@ AS 'MODULE_PATHNAME' LANGUAGE C STRICT; CREATE TABLE cstore_tables ( - relid oid NOT NULL, + relfilenode oid NOT NULL, block_row_count int NOT NULL, version_major bigint NOT NULL, version_minor bigint NOT NULL, - PRIMARY KEY (relid) + PRIMARY KEY (relfilenode) ) WITH (user_catalog_table = true); COMMENT ON TABLE cstore_tables IS 'CStore table wide metadata'; CREATE TABLE cstore_stripes ( - relid oid NOT NULL, + relfilenode oid NOT NULL, stripe bigint NOT NULL, file_offset bigint NOT NULL, data_length bigint NOT NULL, @@ -50,14 +50,14 @@ CREATE TABLE cstore_stripes ( block_count int NOT NULL, block_row_count int NOT NULL, row_count bigint NOT NULL, - PRIMARY KEY (relid, stripe), - FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED + PRIMARY KEY (relfilenode, stripe), + FOREIGN KEY (relfilenode) REFERENCES cstore_tables(relfilenode) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata'; CREATE TABLE cstore_skipnodes ( - relid oid NOT NULL, + relfilenode oid NOT NULL, stripe bigint NOT NULL, attr int NOT NULL, block int NOT NULL, @@ -69,8 +69,8 @@ CREATE TABLE cstore_skipnodes ( exists_stream_offset bigint NOT NULL, exists_stream_length bigint NOT NULL, value_compression_type int NOT NULL, - PRIMARY KEY (relid, stripe, attr, block), - FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED + PRIMARY KEY (relfilenode, stripe, attr, block), + FOREIGN KEY (relfilenode, stripe) REFERENCES cstore_stripes(relfilenode, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); COMMENT ON TABLE cstore_tables IS 'CStore per block metadata'; diff --git a/cstore_fdw.c b/cstore_fdw.c index 9a8882697..f9f886f79 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -131,7 +131,6 @@ static List * FindCStoreTables(List *tableList); static List * OpenRelationsForTruncate(List *cstoreTableList); static void FdwNewRelFileNode(Relation relation); static void TruncateCStoreTables(List *cstoreRelationList); -static bool CStoreTable(Oid relationId); static bool CStoreServer(ForeignServer *server); static bool DistributedTable(Oid relationId); static bool DistributedWorkerCopy(CopyStmt *copyStatement); @@ -189,7 +188,6 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); #endif static void cstore_fdw_initrel(Relation rel); -static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode); static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode); PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger); @@ -267,7 +265,8 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS) * We have no chance to hook into server creation to create data * directory for it during database creation time. */ - InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); + InitializeCStoreTableFile(relation->rd_node.relNode, + CStoreGetOptions(relationId)); heap_close(relation, AccessExclusiveLock); } } @@ -403,7 +402,7 @@ CopyCStoreTableStatement(CopyStmt *copyStatement) { Oid relationId = RangeVarGetRelid(copyStatement->relation, AccessShareLock, true); - bool cstoreTable = CStoreTable(relationId); + bool cstoreTable = IsCStoreFdwTable(relationId); if (cstoreTable) { bool distributedTable = DistributedTable(relationId); @@ -558,12 +557,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) #endif /* init state to write to the cstore file */ - writeState = CStoreBeginWrite(relationId, + writeState = CStoreBeginWrite(relation, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, tupleDescriptor); - writeState->relation = relation; while (nextRowFound) { @@ -686,7 +684,7 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement) } relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true); - if (!CStoreTable(relationId)) + if (!IsCStoreFdwTable(relationId)) { return; } @@ -765,7 +763,7 @@ FindCStoreTables(List *tableList) { RangeVar *rangeVar = (RangeVar *) lfirst(relationCell); Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true); - if (CStoreTable(relationId) && !DistributedTable(relationId)) + if (IsCStoreFdwTable(relationId) && !DistributedTable(relationId)) { cstoreTableList = lappend(cstoreTableList, rangeVar); } @@ -825,10 +823,11 @@ TruncateCStoreTables(List *cstoreRelationList) Relation relation = (Relation) lfirst(relationCell); Oid relationId = relation->rd_id; - Assert(CStoreTable(relationId)); + Assert(IsCStoreFdwTable(relationId)); FdwNewRelFileNode(relation); - InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); + InitializeCStoreTableFile(relation->rd_node.relNode, + CStoreGetOptions(relationId)); } } @@ -861,7 +860,6 @@ FdwNewRelFileNode(Relation relation) Relation tmprel; Oid tablespace; Oid filenode; - RelFileNode newrnode; /* * Upgrade to AccessExclusiveLock, and hold until the end of the @@ -887,10 +885,6 @@ FdwNewRelFileNode(Relation relation) filenode = GetNewRelFileNode(tablespace, NULL, persistence); - newrnode.spcNode = tablespace; - newrnode.dbNode = MyDatabaseId; - newrnode.relNode = filenode; - classform->relfilenode = filenode; classform->relpages = 0; /* it's empty until further notice */ classform->reltuples = 0; @@ -900,6 +894,10 @@ FdwNewRelFileNode(Relation relation) CatalogTupleUpdate(pg_class, &tuple->t_self, tuple); CommandCounterIncrement(); + + relation->rd_node.spcNode = tablespace; + relation->rd_node.dbNode = MyDatabaseId; + relation->rd_node.relNode = filenode; } heap_freetuple(tuple); @@ -928,11 +926,11 @@ FdwCreateStorage(Relation relation) /* - * CStoreTable checks if the given table name belongs to a foreign columnar store + * IsCStoreFdwTable checks if the given table name belongs to a foreign columnar store * table. If it does, the function returns true. Otherwise, it returns false. */ -static bool -CStoreTable(Oid relationId) +bool +IsCStoreFdwTable(Oid relationId) { bool cstoreTable = false; char relationKind = 0; @@ -1055,7 +1053,7 @@ Datum cstore_table_size(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - bool cstoreTable = CStoreTable(relationId); + bool cstoreTable = IsCStoreFdwTable(relationId); Relation relation; BlockNumber nblocks; @@ -1705,6 +1703,7 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) ForeignScan *foreignScan = NULL; List *foreignPrivateList = NIL; List *whereClauseList = NIL; + Relation relation = NULL; cstore_fdw_initrel(currentRelation); @@ -1721,9 +1720,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) whereClauseList = foreignScan->scan.plan.qual; columnList = (List *) linitial(foreignPrivateList); - readState = CStoreBeginRead(foreignTableId, - tupleDescriptor, columnList, whereClauseList); - readState->relation = cstore_fdw_open(foreignTableId, AccessShareLock); + relation = cstore_fdw_open(foreignTableId, AccessShareLock); + readState = CStoreBeginRead(relation, tupleDescriptor, columnList, whereClauseList); scanState->fdw_state = (void *) readState; } @@ -2067,13 +2065,12 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela cstoreOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); - writeState = CStoreBeginWrite(foreignTableOid, + writeState = CStoreBeginWrite(relation, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, tupleDescriptor); - writeState->relation = relation; relationInfo->ri_FdwState = (void *) writeState; } @@ -2196,7 +2193,7 @@ cstore_fdw_initrel(Relation rel) } -static Relation +Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode) { Relation rel = heap_open(relationId, lockmode); diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 690e9eba9..4459d3009 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -50,8 +50,7 @@ static Oid CStoreTablesIndexRelationId(void); static Oid CStoreSkipNodesRelationId(void); static Oid CStoreSkipNodesIndexRelationId(void); static Oid CStoreNamespaceId(void); -static int TableBlockRowCount(Oid relid); -static void DeleteTableMetadataRowIfExists(Oid relid); +static bool ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount); static ModifyState * StartModifyRelation(Relation rel); static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls); @@ -63,14 +62,14 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); /* constants for cstore_table */ #define Natts_cstore_tables 4 -#define Anum_cstore_tables_relid 1 +#define Anum_cstore_tables_relfilenode 1 #define Anum_cstore_tables_block_row_count 2 #define Anum_cstore_tables_version_major 3 #define Anum_cstore_tables_version_minor 4 /* constants for cstore_stripe */ #define Natts_cstore_stripes 8 -#define Anum_cstore_stripes_relid 1 +#define Anum_cstore_stripes_relfilenode 1 #define Anum_cstore_stripes_stripe 2 #define Anum_cstore_stripes_file_offset 3 #define Anum_cstore_stripes_data_length 4 @@ -81,7 +80,7 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); /* constants for cstore_skipnodes */ #define Natts_cstore_skipnodes 12 -#define Anum_cstore_skipnodes_relid 1 +#define Anum_cstore_skipnodes_relfilenode 1 #define Anum_cstore_skipnodes_stripe 2 #define Anum_cstore_skipnodes_attr 3 #define Anum_cstore_skipnodes_block 4 @@ -99,7 +98,7 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); * InitCStoreTableMetadata adds a record for the given relation in cstore_table. */ void -InitCStoreTableMetadata(Oid relid, int blockRowCount) +InitCStoreTableMetadata(Oid relfilenode, int blockRowCount) { Oid cstoreTablesOid = InvalidOid; Relation cstoreTables = NULL; @@ -107,13 +106,13 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount) bool nulls[Natts_cstore_tables] = { 0 }; Datum values[Natts_cstore_tables] = { - ObjectIdGetDatum(relid), + ObjectIdGetDatum(relfilenode), Int32GetDatum(blockRowCount), Int32GetDatum(CSTORE_VERSION_MAJOR), Int32GetDatum(CSTORE_VERSION_MINOR) }; - DeleteTableMetadataRowIfExists(relid); + DeleteTableMetadataRowIfExists(relfilenode); cstoreTablesOid = CStoreTablesRelationId(); cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock); @@ -133,7 +132,7 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount) * of cstore_skipnodes. */ void -SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList, +SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor) { uint32 columnIndex = 0; @@ -155,7 +154,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList, &stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex]; Datum values[Natts_cstore_skipnodes] = { - ObjectIdGetDatum(relid), + ObjectIdGetDatum(relfilenode), Int64GetDatum(stripe), Int32GetDatum(columnIndex + 1), Int32GetDatum(blockIndex), @@ -201,7 +200,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList, * ReadStripeSkipList fetches StripeSkipList for a given stripe. */ StripeSkipList * -ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor, +ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor, uint32 blockCount) { StripeSkipList *skipList = NULL; @@ -218,8 +217,8 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor, cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock); index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock); - ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relfilenode, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); @@ -311,11 +310,11 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor, * InsertStripeMetadataRow adds a row to cstore_stripes. */ void -InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) +InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe) { bool nulls[Natts_cstore_stripes] = { 0 }; Datum values[Natts_cstore_stripes] = { - ObjectIdGetDatum(relid), + ObjectIdGetDatum(relfilenode), Int64GetDatum(stripe->id), Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->dataLength), @@ -339,11 +338,11 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) /* - * ReadTableMetadata constructs TableMetadata for a given relid by reading + * ReadTableMetadata constructs TableMetadata for a given relfilenode by reading * from cstore_tables and cstore_stripes. */ TableMetadata * -ReadTableMetadata(Oid relid) +ReadTableMetadata(Oid relfilenode) { Oid cstoreStripesOid = InvalidOid; Relation cstoreStripes = NULL; @@ -352,12 +351,18 @@ ReadTableMetadata(Oid relid) ScanKeyData scanKey[1]; SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple; + bool found = false; TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata)); - tableMetadata->blockRowCount = TableBlockRowCount(relid); + found = ReadCStoreTables(relfilenode, &tableMetadata->blockRowCount); + if (!found) + { + ereport(ERROR, (errmsg("Relfilenode %d doesn't belong to a cstore table.", + relfilenode))); + } - ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relfilenode, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); cstoreStripesOid = CStoreStripesRelationId(); cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock); @@ -402,12 +407,13 @@ ReadTableMetadata(Oid relid) /* - * TableBlockRowCount returns block_row_count column from cstore_tables for a given relid. + * ReadCStoreTables reads corresponding record from cstore_tables. Returns false if + * table was not found in cstore_tables. */ -static int -TableBlockRowCount(Oid relid) +static bool +ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount) { - int blockRowCount = 0; + bool found = false; Oid cstoreTablesOid = InvalidOid; Relation cstoreTables = NULL; Relation index = NULL; @@ -416,12 +422,29 @@ TableBlockRowCount(Oid relid) SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple = NULL; - ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); cstoreTablesOid = CStoreTablesRelationId(); - cstoreTables = heap_open(cstoreTablesOid, AccessShareLock); - index = index_open(CStoreTablesIndexRelationId(), AccessShareLock); + cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock); + if (cstoreTables == NULL) + { + /* + * Extension has been dropped. This can be called while + * dropping extension or database via ObjectAccess(). + */ + return false; + } + + index = try_relation_open(CStoreTablesIndexRelationId(), AccessShareLock); + if (index == NULL) + { + heap_close(cstoreTables, NoLock); + + /* extension has been dropped */ + return false; + } + tupleDescriptor = RelationGetDescr(cstoreTables); scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); @@ -432,22 +455,24 @@ TableBlockRowCount(Oid relid) Datum datumArray[Natts_cstore_tables]; bool isNullArray[Natts_cstore_tables]; heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - 1]); + *blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - + 1]); + found = true; } systable_endscan_ordered(scanDescriptor); index_close(index, NoLock); heap_close(cstoreTables, NoLock); - return blockRowCount; + return found; } /* - * DeleteTableMetadataRowIfExists removes the row with given relid from cstore_stripes. + * DeleteTableMetadataRowIfExists removes the row with given relfilenode from cstore_stripes. */ -static void -DeleteTableMetadataRowIfExists(Oid relid) +void +DeleteTableMetadataRowIfExists(Oid relfilenode) { Oid cstoreTablesOid = InvalidOid; Relation cstoreTables = NULL; @@ -456,11 +481,17 @@ DeleteTableMetadataRowIfExists(Oid relid) SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple = NULL; - ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); cstoreTablesOid = CStoreTablesRelationId(); - cstoreTables = heap_open(cstoreTablesOid, AccessShareLock); + cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock); + if (cstoreTables == NULL) + { + /* extension has been dropped */ + return; + } + index = index_open(CStoreTablesIndexRelationId(), AccessShareLock); scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); diff --git a/cstore_reader.c b/cstore_reader.c index 25702b272..2ee4101c0 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -80,14 +80,15 @@ static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size); * read handle that's used during reading rows and finishing the read operation. */ TableReadState * -CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, +CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList) { TableReadState *readState = NULL; TableMetadata *tableMetadata = NULL; MemoryContext stripeReadContext = NULL; + Oid relNode = relation->rd_node.relNode; - tableMetadata = ReadTableMetadata(relationId); + tableMetadata = ReadTableMetadata(relNode); /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -99,7 +100,7 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, ALLOCSET_DEFAULT_SIZES); readState = palloc0(sizeof(TableReadState)); - readState->relationId = relationId; + readState->relation = relation; readState->tableMetadata = tableMetadata; readState->projectedColumnList = projectedColumnList; readState->whereClauseList = whereClauseList; @@ -308,7 +309,7 @@ CStoreTableRowCount(Relation relation) ListCell *stripeMetadataCell = NULL; uint64 totalRowCount = 0; - tableMetadata = ReadTableMetadata(relation->rd_id); + tableMetadata = ReadTableMetadata(relation->rd_node.relNode); foreach(stripeMetadataCell, tableMetadata->stripeMetadataList) { @@ -337,7 +338,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); - StripeSkipList *stripeSkipList = ReadStripeSkipList(RelationGetRelid(relation), + StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node.relNode, stripeMetadata->id, tupleDescriptor, stripeMetadata->blockCount); diff --git a/cstore_tableam.c b/cstore_tableam.c index 243cbcb3e..d091916cf 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -97,13 +97,11 @@ cstore_init_write_state(Relation relation) TupleDesc tupdesc = RelationGetDescr(relation); elog(LOG, "initializing write state for relation %d", relation->rd_id); - CStoreWriteState = CStoreBeginWrite(relation->rd_id, + CStoreWriteState = CStoreBeginWrite(relation, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, tupdesc); - - CStoreWriteState->relation = relation; } } @@ -134,16 +132,12 @@ cstore_beginscan(Relation relation, Snapshot snapshot, ParallelTableScanDesc parallel_scan, uint32 flags) { - Oid relid = relation->rd_id; TupleDesc tupdesc = relation->rd_att; - CStoreOptions *cstoreOptions = NULL; TableReadState *readState = NULL; CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); List *columnList = NIL; MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); - cstoreOptions = CStoreTableAMGetOptions(); - scan->cs_base.rs_rd = relation; scan->cs_base.rs_snapshot = snapshot; scan->cs_base.rs_nkeys = nkeys; @@ -171,8 +165,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot, columnList = lappend(columnList, var); } - readState = CStoreBeginRead(relid, tupdesc, columnList, NULL); - readState->relation = relation; + readState = CStoreBeginRead(relation, tupdesc, columnList, NULL); scan->cs_readState = readState; @@ -443,7 +436,7 @@ cstore_relation_set_new_filenode(Relation rel, *freezeXid = RecentXmin; *minmulti = GetOldestMultiXactId(); srel = RelationCreateStorage(*newrnode, persistence); - InitializeCStoreTableFile(rel->rd_id, rel, CStoreTableAMGetOptions()); + InitializeCStoreTableFile(newrnode->relNode, CStoreTableAMGetOptions()); smgrclose(srel); } diff --git a/cstore_writer.c b/cstore_writer.c index 91e73ffa8..728c855b4 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -58,7 +58,7 @@ static StringInfo CopyStringInfo(StringInfo sourceString); * will be added. */ TableWriteState * -CStoreBeginWrite(Oid relationId, +CStoreBeginWrite(Relation relation, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, TupleDesc tupleDescriptor) @@ -73,8 +73,9 @@ CStoreBeginWrite(Oid relationId, bool *columnMaskArray = NULL; BlockData *blockData = NULL; uint64 currentStripeId = 0; + Oid relNode = relation->rd_node.relNode; - tableMetadata = ReadTableMetadata(relationId); + tableMetadata = ReadTableMetadata(relNode); /* * If stripeMetadataList is not empty, jump to the position right after @@ -127,7 +128,7 @@ CStoreBeginWrite(Oid relationId, blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount); writeState = palloc0(sizeof(TableWriteState)); - writeState->relationId = relationId; + writeState->relation = relation; writeState->tableMetadata = tableMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; @@ -251,7 +252,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul * doesn't free it. */ MemoryContextSwitchTo(oldContext); - InsertStripeMetadataRow(writeState->relationId, &stripeMetadata); + InsertStripeMetadataRow(writeState->relation->rd_node.relNode, + &stripeMetadata); AppendStripeMetadata(tableMetadata, stripeMetadata); } else @@ -280,7 +282,8 @@ CStoreEndWrite(TableWriteState *writeState) MemoryContextReset(writeState->stripeWriteContext); MemoryContextSwitchTo(oldContext); - InsertStripeMetadataRow(writeState->relationId, &stripeMetadata); + InsertStripeMetadataRow(writeState->relation->rd_node.relNode, + &stripeMetadata); AppendStripeMetadata(writeState->tableMetadata, stripeMetadata); } @@ -543,7 +546,8 @@ FlushStripe(TableWriteState *writeState) } /* create skip list and footer buffers */ - SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, + SaveStripeSkipList(writeState->relation->rd_node.relNode, + writeState->currentStripeId, stripeSkipList, tupleDescriptor); for (blockIndex = 0; blockIndex < blockCount; blockIndex++) From d37c717e143fdd07275393f5e81ebbc6780fc069 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Sat, 26 Sep 2020 12:39:16 -0700 Subject: [PATCH 2/4] Clean-up resources on drop --- cstore.c | 80 ++++++++++++++++++++++++++++++++++++++++ cstore.h | 1 + cstore_fdw.c | 50 ------------------------- cstore_metadata_tables.c | 11 ++++++ expected/am_drop.out | 15 ++++++++ expected/fdw_drop.out | 15 ++++++++ sql/am_drop.sql | 8 ++++ sql/fdw_drop.sql | 8 ++++ 8 files changed, 138 insertions(+), 50 deletions(-) diff --git a/cstore.c b/cstore.c index d6b6751e2..1d6e414ae 100644 --- a/cstore.c +++ b/cstore.c @@ -16,9 +16,13 @@ #include #include +#include "access/heapam.h" +#include "catalog/objectaccess.h" +#include "catalog/storage.h" #include "miscadmin.h" #include "utils/guc.h" #include "utils/rel.h" +#include "utils/relcache.h" #include "cstore.h" @@ -38,6 +42,11 @@ static const struct config_enum_entry cstore_compression_options[] = { NULL, 0, false } }; +static object_access_hook_type prevObjectAccess = NULL; + +static void ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, + void *arg); + void cstore_init() { @@ -78,6 +87,9 @@ cstore_init() NULL, NULL, NULL); + + prevObjectAccess = object_access_hook; + object_access_hook = ObjectAccess; } @@ -110,3 +122,71 @@ InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions) { InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount); } + + +/* + * Implements object_access_hook. One of the places this is called is just + * before dropping an object, which allows us to clean-up resources for + * cstore tables while the pg_class record for the table is still there. + */ +static void +ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg) +{ + if (prevObjectAccess) + { + prevObjectAccess(access, classId, objectId, subId, arg); + } + + /* + * Do nothing if this is not a DROP relation command. + */ + if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId)) + { + return; + } + + if (IsCStoreFdwTable(objectId)) + { + /* + * Drop both metadata and storage. We need to drop storage here since + * we manage relfilenode for FDW tables in the extension. + */ + Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock); + RelationOpenSmgr(rel); + RelationDropStorage(rel); + DeleteTableMetadataRowIfExists(rel->rd_node.relNode); + + /* keep the lock since we did physical changes to the relation */ + relation_close(rel, NoLock); + } + else + { + Oid relNode = InvalidOid; + Relation rel = try_relation_open(objectId, AccessExclusiveLock); + if (rel == NULL) + { + return; + } + + relNode = rel->rd_node.relNode; + if (IsCStoreStorage(relNode)) + { + /* + * Drop only metadata for table am cstore tables. Postgres manages + * storage for these tables, so we don't need to drop that. + */ + DeleteTableMetadataRowIfExists(relNode); + + /* keep the lock since we did physical changes to the relation */ + relation_close(rel, NoLock); + } + else + { + /* + * For non-cstore tables, we do nothing. + * Release the lock since we haven't changed the relation. + */ + relation_close(rel, AccessExclusiveLock); + } + } +} diff --git a/cstore.h b/cstore.h index dd5f9e6e1..919352c6c 100644 --- a/cstore.h +++ b/cstore.h @@ -283,6 +283,7 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); /* cstore_metadata_tables.c */ +extern bool IsCStoreStorage(Oid relfilenode); extern void DeleteTableMetadataRowIfExists(Oid relfilenode); extern void InitCStoreTableMetadata(Oid relfilenode, int blockRowCount); extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe); diff --git a/cstore_fdw.c b/cstore_fdw.c index f9f886f79..d4c5c1ec1 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -126,7 +126,6 @@ static uint64 CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString); static uint64 CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString); static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement); -static List * DroppedCStoreRelidList(DropStmt *dropStatement); static List * FindCStoreTables(List *tableList); static List * OpenRelationsForTruncate(List *cstoreTableList); static void FdwNewRelFileNode(Relation relation); @@ -315,25 +314,6 @@ CStoreProcessUtility(Node * parseTree, const char * queryString, destReceiver, completionTag); } } - else if (nodeTag(parseTree) == T_DropStmt) - { - List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree); - ListCell *lc = NULL; - - /* drop smgr storage */ - foreach(lc, dropRelids) - { - Oid relid = lfirst_oid(lc); - Relation relation = cstore_fdw_open(relid, AccessExclusiveLock); - - RelationOpenSmgr(relation); - RelationDropStorage(relation); - heap_close(relation, AccessExclusiveLock); - } - - CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, - destReceiver, completionTag); - } else if (nodeTag(parseTree) == T_TruncateStmt) { TruncateStmt *truncateStatement = (TruncateStmt *) parseTree; @@ -723,36 +703,6 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement) } -/* - * DropppedCStoreRelidList extracts and returns the list of cstore relids - * from DROP table statement - */ -static List * -DroppedCStoreRelidList(DropStmt *dropStatement) -{ - List *droppedCStoreRelidList = NIL; - - if (dropStatement->removeType == OBJECT_FOREIGN_TABLE) - { - ListCell *dropObjectCell = NULL; - foreach(dropObjectCell, dropStatement->objects) - { - List *tableNameList = (List *) lfirst(dropObjectCell); - RangeVar *rangeVar = makeRangeVarFromNameList(tableNameList); - - Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true); - if (CStoreTable(relationId)) - { - droppedCStoreRelidList = lappend_oid(droppedCStoreRelidList, - relationId); - } - } - } - - return droppedCStoreRelidList; -} - - /* FindCStoreTables returns list of CStore tables from given table list */ static List * FindCStoreTables(List *tableList) diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 4459d3009..e1f1caedf 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -94,6 +94,17 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); #define Anum_cstore_skipnodes_value_compression_type 12 +/* + * IsCStoreStorage returns if relfilenode belongs to a cstore table. + */ +bool +IsCStoreStorage(Oid relfilenode) +{ + uint64 blockRowCount = 0; + return ReadCStoreTables(relfilenode, &blockRowCount); +} + + /* * InitCStoreTableMetadata adds a record for the given relation in cstore_table. */ diff --git a/expected/am_drop.out b/expected/am_drop.out index e1c634d7f..c1fc60519 100644 --- a/expected/am_drop.out +++ b/expected/am_drop.out @@ -12,14 +12,29 @@ -- 'postgres' directory is excluded from comparison to have the same result. -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset -- DROP cstore_fdw tables DROP TABLE contestant; DROP TABLE contestant_compressed; +-- make sure DROP deletes metadata +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + ?column? +---------- + 2 +(1 row) + -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE TABLE test_schema.test_table(data int) USING cstore_tableam; +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset DROP SCHEMA test_schema CASCADE; NOTICE: drop cascades to table test_schema.test_table +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + ?column? +---------- + 1 +(1 row) + SELECT current_database() datname \gset CREATE DATABASE db_to_drop; \c db_to_drop diff --git a/expected/fdw_drop.out b/expected/fdw_drop.out index 926f69337..24c0f518d 100644 --- a/expected/fdw_drop.out +++ b/expected/fdw_drop.out @@ -12,14 +12,29 @@ -- 'postgres' directory is excluded from comparison to have the same result. -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset -- DROP cstore_fdw tables DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant_compressed; +-- make sure DROP deletes metadata +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + ?column? +---------- + 2 +(1 row) + -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset DROP SCHEMA test_schema CASCADE; NOTICE: drop cascades to foreign table test_schema.test_table +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + ?column? +---------- + 1 +(1 row) + SELECT current_database() datname \gset CREATE DATABASE db_to_drop; \c db_to_drop diff --git a/sql/am_drop.sql b/sql/am_drop.sql index f92f90b9d..06873aa6e 100644 --- a/sql/am_drop.sql +++ b/sql/am_drop.sql @@ -15,14 +15,22 @@ -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset + -- DROP cstore_fdw tables DROP TABLE contestant; DROP TABLE contestant_compressed; +-- make sure DROP deletes metadata +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE TABLE test_schema.test_table(data int) USING cstore_tableam; + +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset DROP SCHEMA test_schema CASCADE; +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; SELECT current_database() datname \gset diff --git a/sql/fdw_drop.sql b/sql/fdw_drop.sql index c64b5c99b..7c6dd5c6e 100644 --- a/sql/fdw_drop.sql +++ b/sql/fdw_drop.sql @@ -15,14 +15,22 @@ -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset + -- DROP cstore_fdw tables DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant_compressed; +-- make sure DROP deletes metadata +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; + -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; + +SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset DROP SCHEMA test_schema CASCADE; +SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; SELECT current_database() datname \gset From a87c15a1e1078343e6cef85127b0e3eff7f4d1ca Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 1 Oct 2020 21:09:47 -0700 Subject: [PATCH 3/4] Address feedback --- cstore.c | 80 ------------------------------------ cstore.h | 2 - cstore_fdw.c | 82 +++++++++++++++++++++++++++++++++---- cstore_metadata_tables.c | 11 ----- cstore_tableam.c | 87 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 100 deletions(-) diff --git a/cstore.c b/cstore.c index 1d6e414ae..d6b6751e2 100644 --- a/cstore.c +++ b/cstore.c @@ -16,13 +16,9 @@ #include #include -#include "access/heapam.h" -#include "catalog/objectaccess.h" -#include "catalog/storage.h" #include "miscadmin.h" #include "utils/guc.h" #include "utils/rel.h" -#include "utils/relcache.h" #include "cstore.h" @@ -42,11 +38,6 @@ static const struct config_enum_entry cstore_compression_options[] = { NULL, 0, false } }; -static object_access_hook_type prevObjectAccess = NULL; - -static void ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, - void *arg); - void cstore_init() { @@ -87,9 +78,6 @@ cstore_init() NULL, NULL, NULL); - - prevObjectAccess = object_access_hook; - object_access_hook = ObjectAccess; } @@ -122,71 +110,3 @@ InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions) { InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount); } - - -/* - * Implements object_access_hook. One of the places this is called is just - * before dropping an object, which allows us to clean-up resources for - * cstore tables while the pg_class record for the table is still there. - */ -static void -ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg) -{ - if (prevObjectAccess) - { - prevObjectAccess(access, classId, objectId, subId, arg); - } - - /* - * Do nothing if this is not a DROP relation command. - */ - if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId)) - { - return; - } - - if (IsCStoreFdwTable(objectId)) - { - /* - * Drop both metadata and storage. We need to drop storage here since - * we manage relfilenode for FDW tables in the extension. - */ - Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock); - RelationOpenSmgr(rel); - RelationDropStorage(rel); - DeleteTableMetadataRowIfExists(rel->rd_node.relNode); - - /* keep the lock since we did physical changes to the relation */ - relation_close(rel, NoLock); - } - else - { - Oid relNode = InvalidOid; - Relation rel = try_relation_open(objectId, AccessExclusiveLock); - if (rel == NULL) - { - return; - } - - relNode = rel->rd_node.relNode; - if (IsCStoreStorage(relNode)) - { - /* - * Drop only metadata for table am cstore tables. Postgres manages - * storage for these tables, so we don't need to drop that. - */ - DeleteTableMetadataRowIfExists(relNode); - - /* keep the lock since we did physical changes to the relation */ - relation_close(rel, NoLock); - } - else - { - /* - * For non-cstore tables, we do nothing. - * Release the lock since we haven't changed the relation. - */ - relation_close(rel, AccessExclusiveLock); - } - } -} diff --git a/cstore.h b/cstore.h index 919352c6c..ef937ba3c 100644 --- a/cstore.h +++ b/cstore.h @@ -249,8 +249,6 @@ extern void cstore_init(void); extern CompressionType ParseCompressionType(const char *compressionTypeString); extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions); -extern bool IsCStoreFdwTable(Oid relationId); -extern Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode); /* Function declarations for writing to a cstore file */ extern TableWriteState * CStoreBeginWrite(Relation relation, diff --git a/cstore_fdw.c b/cstore_fdw.c index d4c5c1ec1..406a153c4 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -25,6 +25,7 @@ #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/objectaccess.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_namespace.h" #include "catalog/storage.h" @@ -54,6 +55,7 @@ #include "parser/parser.h" #include "parser/parse_coerce.h" #include "parser/parse_type.h" +#include "storage/lmgr.h" #include "storage/smgr.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -105,6 +107,8 @@ static const CStoreValidOption ValidOptionArray[] = { OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId } }; +static object_access_hook_type prevObjectAccessHook = NULL; + /* local functions forward declarations */ #if PG_VERSION_NUM >= 100000 static void CStoreProcessUtility(PlannedStmt *plannedStatement, const char *queryString, @@ -130,7 +134,8 @@ static List * FindCStoreTables(List *tableList); static List * OpenRelationsForTruncate(List *cstoreTableList); static void FdwNewRelFileNode(Relation relation); static void TruncateCStoreTables(List *cstoreRelationList); -static bool CStoreServer(ForeignServer *server); +static bool IsCStoreFdwTable(Oid relationId); +static bool IsCStoreServer(ForeignServer *server); static bool DistributedTable(Oid relationId); static bool DistributedWorkerCopy(CopyStmt *copyStatement); static StringInfo OptionNamesString(Oid currentContextId); @@ -187,7 +192,11 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); #endif static void cstore_fdw_initrel(Relation rel); +static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode); static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode); +static void CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, + int subId, + void *arg); PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger); PG_FUNCTION_INFO_V1(cstore_table_size); @@ -209,6 +218,8 @@ cstore_fdw_init() { PreviousProcessUtilityHook = ProcessUtility_hook; ProcessUtility_hook = CStoreProcessUtility; + prevObjectAccessHook = object_access_hook; + object_access_hook = CStoreFdwObjectAccessHook; } @@ -251,7 +262,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS) bool missingOK = false; ForeignServer *server = GetForeignServerByName(serverName, missingOK); - if (CStoreServer(server)) + if (IsCStoreServer(server)) { Oid relationId = RangeVarGetRelid(createStatement->base.relation, AccessShareLock, false); @@ -358,7 +369,6 @@ CStoreProcessUtility(Node * parseTree, const char * queryString, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, destReceiver, completionTag); } - /* handle other utility statements */ else { @@ -895,7 +905,7 @@ IsCStoreFdwTable(Oid relationId) { ForeignTable *foreignTable = GetForeignTable(relationId); ForeignServer *server = GetForeignServer(foreignTable->serverid); - if (CStoreServer(server)) + if (IsCStoreServer(server)) { cstoreTable = true; } @@ -906,11 +916,11 @@ IsCStoreFdwTable(Oid relationId) /* - * CStoreServer checks if the given foreign server belongs to cstore_fdw. If it + * IsCStoreServer checks if the given foreign server belongs to cstore_fdw. If it * does, the function returns true. Otherwise, it returns false. */ static bool -CStoreServer(ForeignServer *server) +IsCStoreServer(ForeignServer *server) { ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid); bool cstoreServer = false; @@ -2143,7 +2153,7 @@ cstore_fdw_initrel(Relation rel) } -Relation +static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode) { Relation rel = heap_open(relationId, lockmode); @@ -2163,3 +2173,61 @@ cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode) return rel; } + + +/* + * Implements object_access_hook. One of the places this is called is just + * before dropping an object, which allows us to clean-up resources for + * cstore tables. + * + * When cleaning up resources, we need to have access to the pg_class record + * for the table so we can indentify the relfilenode belonging to the relation. + * We don't have access to this information in sql_drop event triggers, since + * the relation has already been dropped there. object_access_hook is called + * __before__ dropping tables, so we still have access to the pg_class + * entry here. + * + * Note that the utility hook is called once per __command__, and not for + * every object dropped, and since a drop can cascade to other objects, it + * is difficult to get full set of dropped objects in the utility hook. + * But object_access_hook is called once per dropped object, so it is + * much easier to clean-up all dropped objects here. + */ +static void +CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, + int subId, void *arg) +{ + if (prevObjectAccessHook) + { + prevObjectAccessHook(access, classId, objectId, subId, arg); + } + + /* + * Do nothing if this is not a DROP relation command. + */ + if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId)) + { + return; + } + + /* + * Lock relation to prevent it from being dropped and to avoid + * race conditions in the next if block. + */ + LockRelationOid(objectId, AccessShareLock); + + if (IsCStoreFdwTable(objectId)) + { + /* + * Drop both metadata and storage. We need to drop storage here since + * we manage relfilenode for FDW tables in the extension. + */ + Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock); + RelationOpenSmgr(rel); + RelationDropStorage(rel); + DeleteTableMetadataRowIfExists(rel->rd_node.relNode); + + /* keep the lock since we did physical changes to the relation */ + relation_close(rel, NoLock); + } +} diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index e1f1caedf..4459d3009 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -94,17 +94,6 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); #define Anum_cstore_skipnodes_value_compression_type 12 -/* - * IsCStoreStorage returns if relfilenode belongs to a cstore table. - */ -bool -IsCStoreStorage(Oid relfilenode) -{ - uint64 blockRowCount = 0; - return ReadCStoreTables(relfilenode, &blockRowCount); -} - - /* * InitCStoreTableMetadata adds a record for the given relation in cstore_table. */ diff --git a/cstore_tableam.c b/cstore_tableam.c index d091916cf..6d02ebe24 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -14,6 +14,8 @@ #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/index.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_am.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" #include "commands/progress.h" @@ -30,10 +32,13 @@ #include "storage/smgr.h" #include "utils/builtins.h" #include "utils/rel.h" +#include "utils/syscache.h" #include "cstore.h" #include "cstore_tableam.h" +#define CSTORE_TABLEAM_NAME "cstore_tableam" + typedef struct CStoreScanDescData { TableScanDescData cs_base; @@ -45,6 +50,13 @@ typedef struct CStoreScanDescData *CStoreScanDesc; static TableWriteState *CStoreWriteState = NULL; static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL; static MemoryContext CStoreContext = NULL; +static object_access_hook_type prevObjectAccessHook = NULL; + +/* forward declaration for static functions */ +static void CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid + objectId, int subId, + void *arg); +static bool IsCStoreTableAmTable(Oid relationId); static CStoreOptions * CStoreTableAMGetOptions(void) @@ -624,6 +636,8 @@ cstore_tableam_init() { PreviousExecutorEndHook = ExecutorEnd_hook; ExecutorEnd_hook = CStoreExecutorEnd; + prevObjectAccessHook = object_access_hook; + object_access_hook = CStoreTableAMObjectAccessHook; } @@ -634,6 +648,79 @@ cstore_tableam_finish() } +/* + * Implements object_access_hook. One of the places this is called is just + * before dropping an object, which allows us to clean-up resources for + * cstore tables. + * + * See the comments for CStoreFdwObjectAccessHook for more details. + */ +static void +CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int + subId, + void *arg) +{ + if (prevObjectAccessHook) + { + prevObjectAccessHook(access, classId, objectId, subId, arg); + } + + /* + * Do nothing if this is not a DROP relation command. + */ + if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId)) + { + return; + } + + /* + * Lock relation to prevent it from being dropped and to avoid + * race conditions in the next if block. + */ + LockRelationOid(objectId, AccessShareLock); + + if (IsCStoreTableAmTable(objectId)) + { + /* + * Drop metadata. No need to drop storage here since for + * tableam tables storage is managed by postgres. + */ + Relation rel = table_open(objectId, AccessExclusiveLock); + DeleteTableMetadataRowIfExists(rel->rd_node.relNode); + + /* keep the lock since we did physical changes to the relation */ + table_close(rel, NoLock); + } +} + + +/* + * IsCStoreTableAmTable returns true if relation has cstore_tableam + * access method. This can be called before extension creation. + */ +static bool +IsCStoreTableAmTable(Oid relationId) +{ + bool result; + Relation rel; + + if (!OidIsValid(relationId)) + { + return false; + } + + /* + * Lock relation to prevent it from being dropped & + * avoid race conditions. + */ + rel = relation_open(relationId, AccessShareLock); + result = rel->rd_tableam == GetCstoreTableAmRoutine(); + relation_close(rel, NoLock); + + return result; +} + + static const TableAmRoutine cstore_am_methods = { .type = T_TableAmRoutine, From a70b0c362e71d06db62bcc65e0ae973dbcb0278e Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 1 Oct 2020 21:23:06 -0700 Subject: [PATCH 4/4] Rename cstore_tables to cstore_data_files --- cstore.c | 11 ---- cstore.h | 18 +++--- cstore_fdw--1.7.sql | 10 +-- cstore_fdw.c | 19 ++---- cstore_metadata_tables.c | 128 ++++++++++++++++++++------------------- cstore_reader.c | 18 +++--- cstore_tableam.c | 5 +- cstore_writer.c | 26 ++++---- expected/am_drop.out | 8 +-- expected/fdw_drop.out | 8 +-- sql/am_drop.sql | 8 +-- sql/fdw_drop.sql | 8 +-- 12 files changed, 125 insertions(+), 142 deletions(-) diff --git a/cstore.c b/cstore.c index d6b6751e2..a724a62a0 100644 --- a/cstore.c +++ b/cstore.c @@ -99,14 +99,3 @@ ParseCompressionType(const char *compressionTypeString) return compressionType; } - - -/* - * InitializeCStoreTableFile initializes metadata for the given relation - * file node. - */ -void -InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions) -{ - InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount); -} diff --git a/cstore.h b/cstore.h index ef937ba3c..8efb0e6af 100644 --- a/cstore.h +++ b/cstore.h @@ -87,12 +87,12 @@ typedef struct StripeMetadata } StripeMetadata; -/* TableMetadata represents the metadata of a cstore file. */ -typedef struct TableMetadata +/* DataFileMetadata represents the metadata of a cstore file. */ +typedef struct DataFileMetadata { List *stripeMetadataList; uint64 blockRowCount; -} TableMetadata; +} DataFileMetadata; /* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */ @@ -192,7 +192,7 @@ typedef struct StripeBuffers /* TableReadState represents state of a cstore file read operation. */ typedef struct TableReadState { - TableMetadata *tableMetadata; + DataFileMetadata *datafileMetadata; StripeMetadata *currentStripeMetadata; TupleDesc tupleDescriptor; Relation relation; @@ -217,7 +217,7 @@ typedef struct TableReadState /* TableWriteState represents state of a cstore file write operation. */ typedef struct TableWriteState { - TableMetadata *tableMetadata; + DataFileMetadata *datafileMetadata; CompressionType compressionType; TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; @@ -248,7 +248,6 @@ extern int cstore_block_row_count; extern void cstore_init(void); extern CompressionType ParseCompressionType(const char *compressionTypeString); -extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions); /* Function declarations for writing to a cstore file */ extern TableWriteState * CStoreBeginWrite(Relation relation, @@ -281,11 +280,10 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); /* cstore_metadata_tables.c */ -extern bool IsCStoreStorage(Oid relfilenode); -extern void DeleteTableMetadataRowIfExists(Oid relfilenode); -extern void InitCStoreTableMetadata(Oid relfilenode, int blockRowCount); +extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode); +extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount); extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe); -extern TableMetadata * ReadTableMetadata(Oid relfilenode); +extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode); extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor); diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index b3470b6a5..c19bb1449 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -31,7 +31,7 @@ RETURNS bigint AS 'MODULE_PATHNAME' LANGUAGE C STRICT; -CREATE TABLE cstore_tables ( +CREATE TABLE cstore_data_files ( relfilenode oid NOT NULL, block_row_count int NOT NULL, version_major bigint NOT NULL, @@ -39,7 +39,7 @@ CREATE TABLE cstore_tables ( PRIMARY KEY (relfilenode) ) WITH (user_catalog_table = true); -COMMENT ON TABLE cstore_tables IS 'CStore table wide metadata'; +COMMENT ON TABLE cstore_data_files IS 'CStore data file wide metadata'; CREATE TABLE cstore_stripes ( relfilenode oid NOT NULL, @@ -51,10 +51,10 @@ CREATE TABLE cstore_stripes ( block_row_count int NOT NULL, row_count bigint NOT NULL, PRIMARY KEY (relfilenode, stripe), - FOREIGN KEY (relfilenode) REFERENCES cstore_tables(relfilenode) ON DELETE CASCADE INITIALLY DEFERRED + FOREIGN KEY (relfilenode) REFERENCES cstore_data_files(relfilenode) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); -COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata'; +COMMENT ON TABLE cstore_stripes IS 'CStore per stripe metadata'; CREATE TABLE cstore_skipnodes ( relfilenode oid NOT NULL, @@ -73,4 +73,4 @@ CREATE TABLE cstore_skipnodes ( FOREIGN KEY (relfilenode, stripe) REFERENCES cstore_stripes(relfilenode, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); -COMMENT ON TABLE cstore_tables IS 'CStore per block metadata'; +COMMENT ON TABLE cstore_skipnodes IS 'CStore per block metadata'; diff --git a/cstore_fdw.c b/cstore_fdw.c index 406a153c4..33a29ad32 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -267,16 +267,8 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS) Oid relationId = RangeVarGetRelid(createStatement->base.relation, AccessShareLock, false); Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock); - - /* - * Make sure database directory exists before creating a table. - * This is necessary when a foreign server is created inside - * a template database and a new database is created out of it. - * We have no chance to hook into server creation to create data - * directory for it during database creation time. - */ - InitializeCStoreTableFile(relation->rd_node.relNode, - CStoreGetOptions(relationId)); + CStoreOptions *options = CStoreGetOptions(relationId); + InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount); heap_close(relation, AccessExclusiveLock); } } @@ -369,6 +361,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, destReceiver, completionTag); } + /* handle other utility statements */ else { @@ -782,12 +775,12 @@ TruncateCStoreTables(List *cstoreRelationList) { Relation relation = (Relation) lfirst(relationCell); Oid relationId = relation->rd_id; + CStoreOptions *options = CStoreGetOptions(relationId); Assert(IsCStoreFdwTable(relationId)); FdwNewRelFileNode(relation); - InitializeCStoreTableFile(relation->rd_node.relNode, - CStoreGetOptions(relationId)); + InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount); } } @@ -2225,7 +2218,7 @@ CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock); RelationOpenSmgr(rel); RelationDropStorage(rel); - DeleteTableMetadataRowIfExists(rel->rd_node.relNode); + DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode); /* keep the lock since we did physical changes to the relation */ relation_close(rel, NoLock); diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 4459d3009..3ebee02e7 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -45,12 +45,12 @@ typedef struct static Oid CStoreStripesRelationId(void); static Oid CStoreStripesIndexRelationId(void); -static Oid CStoreTablesRelationId(void); -static Oid CStoreTablesIndexRelationId(void); +static Oid CStoreDataFilesRelationId(void); +static Oid CStoreDataFilesIndexRelationId(void); static Oid CStoreSkipNodesRelationId(void); static Oid CStoreSkipNodesIndexRelationId(void); static Oid CStoreNamespaceId(void); -static bool ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount); +static bool ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount); static ModifyState * StartModifyRelation(Relation rel); static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls); @@ -61,11 +61,11 @@ static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm); static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); /* constants for cstore_table */ -#define Natts_cstore_tables 4 -#define Anum_cstore_tables_relfilenode 1 -#define Anum_cstore_tables_block_row_count 2 -#define Anum_cstore_tables_version_major 3 -#define Anum_cstore_tables_version_minor 4 +#define Natts_cstore_data_files 4 +#define Anum_cstore_data_files_relfilenode 1 +#define Anum_cstore_data_files_block_row_count 2 +#define Anum_cstore_data_files_version_major 3 +#define Anum_cstore_data_files_version_minor 4 /* constants for cstore_stripe */ #define Natts_cstore_stripes 8 @@ -95,35 +95,36 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); /* - * InitCStoreTableMetadata adds a record for the given relation in cstore_table. + * InitCStoreDataFileMetadata adds a record for the given relfilenode + * in cstore_data_files. */ void -InitCStoreTableMetadata(Oid relfilenode, int blockRowCount) +InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount) { - Oid cstoreTablesOid = InvalidOid; - Relation cstoreTables = NULL; + Oid cstoreDataFilesOid = InvalidOid; + Relation cstoreDataFiles = NULL; ModifyState *modifyState = NULL; - bool nulls[Natts_cstore_tables] = { 0 }; - Datum values[Natts_cstore_tables] = { + bool nulls[Natts_cstore_data_files] = { 0 }; + Datum values[Natts_cstore_data_files] = { ObjectIdGetDatum(relfilenode), Int32GetDatum(blockRowCount), Int32GetDatum(CSTORE_VERSION_MAJOR), Int32GetDatum(CSTORE_VERSION_MINOR) }; - DeleteTableMetadataRowIfExists(relfilenode); + DeleteDataFileMetadataRowIfExists(relfilenode); - cstoreTablesOid = CStoreTablesRelationId(); - cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock); + cstoreDataFilesOid = CStoreDataFilesRelationId(); + cstoreDataFiles = heap_open(cstoreDataFilesOid, RowExclusiveLock); - modifyState = StartModifyRelation(cstoreTables); + modifyState = StartModifyRelation(cstoreDataFiles); InsertTupleAndEnforceConstraints(modifyState, values, nulls); FinishModifyRelation(modifyState); CommandCounterIncrement(); - heap_close(cstoreTables, NoLock); + heap_close(cstoreDataFiles, NoLock); } @@ -338,11 +339,11 @@ InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe) /* - * ReadTableMetadata constructs TableMetadata for a given relfilenode by reading - * from cstore_tables and cstore_stripes. + * ReadDataFileMetadata constructs DataFileMetadata for a given relfilenode by reading + * from cstore_data_files and cstore_stripes. */ -TableMetadata * -ReadTableMetadata(Oid relfilenode) +DataFileMetadata * +ReadDataFileMetadata(Oid relfilenode) { Oid cstoreStripesOid = InvalidOid; Relation cstoreStripes = NULL; @@ -353,8 +354,8 @@ ReadTableMetadata(Oid relfilenode) HeapTuple heapTuple; bool found = false; - TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata)); - found = ReadCStoreTables(relfilenode, &tableMetadata->blockRowCount); + DataFileMetadata *datafileMetadata = palloc0(sizeof(DataFileMetadata)); + found = ReadCStoreDataFiles(relfilenode, &datafileMetadata->blockRowCount); if (!found) { ereport(ERROR, (errmsg("Relfilenode %d doesn't belong to a cstore table.", @@ -394,40 +395,41 @@ ReadTableMetadata(Oid relfilenode) stripeMetadata->rowCount = DatumGetInt64( datumArray[Anum_cstore_stripes_row_count - 1]); - tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList, - stripeMetadata); + datafileMetadata->stripeMetadataList = lappend( + datafileMetadata->stripeMetadataList, + stripeMetadata); } systable_endscan_ordered(scanDescriptor); index_close(index, NoLock); heap_close(cstoreStripes, NoLock); - return tableMetadata; + return datafileMetadata; } /* - * ReadCStoreTables reads corresponding record from cstore_tables. Returns false if - * table was not found in cstore_tables. + * ReadCStoreDataFiles reads corresponding record from cstore_data_files. Returns + * false if table was not found in cstore_data_files. */ static bool -ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount) +ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount) { bool found = false; - Oid cstoreTablesOid = InvalidOid; - Relation cstoreTables = NULL; + Oid cstoreDataFilesOid = InvalidOid; + Relation cstoreDataFiles = NULL; Relation index = NULL; TupleDesc tupleDescriptor = NULL; ScanKeyData scanKey[1]; SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple = NULL; - ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode, + ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); - cstoreTablesOid = CStoreTablesRelationId(); - cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock); - if (cstoreTables == NULL) + cstoreDataFilesOid = CStoreDataFilesRelationId(); + cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock); + if (cstoreDataFiles == NULL) { /* * Extension has been dropped. This can be called while @@ -436,77 +438,77 @@ ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount) return false; } - index = try_relation_open(CStoreTablesIndexRelationId(), AccessShareLock); + index = try_relation_open(CStoreDataFilesIndexRelationId(), AccessShareLock); if (index == NULL) { - heap_close(cstoreTables, NoLock); + heap_close(cstoreDataFiles, NoLock); /* extension has been dropped */ return false; } - tupleDescriptor = RelationGetDescr(cstoreTables); + tupleDescriptor = RelationGetDescr(cstoreDataFiles); - scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); + scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey); heapTuple = systable_getnext(scanDescriptor); if (HeapTupleIsValid(heapTuple)) { - Datum datumArray[Natts_cstore_tables]; - bool isNullArray[Natts_cstore_tables]; + Datum datumArray[Natts_cstore_data_files]; + bool isNullArray[Natts_cstore_data_files]; heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - *blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - + *blockRowCount = DatumGetInt32(datumArray[Anum_cstore_data_files_block_row_count - 1]); found = true; } systable_endscan_ordered(scanDescriptor); index_close(index, NoLock); - heap_close(cstoreTables, NoLock); + heap_close(cstoreDataFiles, NoLock); return found; } /* - * DeleteTableMetadataRowIfExists removes the row with given relfilenode from cstore_stripes. + * DeleteDataFileMetadataRowIfExists removes the row with given relfilenode from cstore_stripes. */ void -DeleteTableMetadataRowIfExists(Oid relfilenode) +DeleteDataFileMetadataRowIfExists(Oid relfilenode) { - Oid cstoreTablesOid = InvalidOid; - Relation cstoreTables = NULL; + Oid cstoreDataFilesOid = InvalidOid; + Relation cstoreDataFiles = NULL; Relation index = NULL; ScanKeyData scanKey[1]; SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple = NULL; - ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode, + ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode)); - cstoreTablesOid = CStoreTablesRelationId(); - cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock); - if (cstoreTables == NULL) + cstoreDataFilesOid = CStoreDataFilesRelationId(); + cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock); + if (cstoreDataFiles == NULL) { /* extension has been dropped */ return; } - index = index_open(CStoreTablesIndexRelationId(), AccessShareLock); + index = index_open(CStoreDataFilesIndexRelationId(), AccessShareLock); - scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); + scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey); heapTuple = systable_getnext(scanDescriptor); if (HeapTupleIsValid(heapTuple)) { - ModifyState *modifyState = StartModifyRelation(cstoreTables); + ModifyState *modifyState = StartModifyRelation(cstoreDataFiles); DeleteTupleAndEnforceConstraints(modifyState, heapTuple); FinishModifyRelation(modifyState); } systable_endscan_ordered(scanDescriptor); index_close(index, NoLock); - heap_close(cstoreTables, NoLock); + heap_close(cstoreDataFiles, NoLock); } @@ -711,24 +713,24 @@ CStoreStripesIndexRelationId(void) /* - * CStoreTablesRelationId returns relation id of cstore_tables. + * CStoreDataFilesRelationId returns relation id of cstore_data_files. * TODO: should we cache this similar to citus? */ static Oid -CStoreTablesRelationId(void) +CStoreDataFilesRelationId(void) { - return get_relname_relid("cstore_tables", CStoreNamespaceId()); + return get_relname_relid("cstore_data_files", CStoreNamespaceId()); } /* - * CStoreTablesIndexRelationId returns relation id of cstore_tables_idx. + * CStoreDataFilesIndexRelationId returns relation id of cstore_data_files_pkey. * TODO: should we cache this similar to citus? */ static Oid -CStoreTablesIndexRelationId(void) +CStoreDataFilesIndexRelationId(void) { - return get_relname_relid("cstore_tables_pkey", CStoreNamespaceId()); + return get_relname_relid("cstore_data_files_pkey", CStoreNamespaceId()); } diff --git a/cstore_reader.c b/cstore_reader.c index 2ee4101c0..66807ad08 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -84,11 +84,11 @@ CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList) { TableReadState *readState = NULL; - TableMetadata *tableMetadata = NULL; + DataFileMetadata *datafileMetadata = NULL; MemoryContext stripeReadContext = NULL; Oid relNode = relation->rd_node.relNode; - tableMetadata = ReadTableMetadata(relNode); + datafileMetadata = ReadDataFileMetadata(relNode); /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -101,7 +101,7 @@ CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor, readState = palloc0(sizeof(TableReadState)); readState->relation = relation; - readState->tableMetadata = tableMetadata; + readState->datafileMetadata = datafileMetadata; readState->projectedColumnList = projectedColumnList; readState->whereClauseList = whereClauseList; readState->stripeBuffers = NULL; @@ -139,7 +139,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { StripeBuffers *stripeBuffers = NULL; StripeMetadata *stripeMetadata = NULL; - List *stripeMetadataList = readState->tableMetadata->stripeMetadataList; + List *stripeMetadataList = readState->datafileMetadata->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); /* if we have read all stripes, return false */ @@ -229,8 +229,8 @@ void CStoreEndRead(TableReadState *readState) { MemoryContextDelete(readState->stripeReadContext); - list_free_deep(readState->tableMetadata->stripeMetadataList); - pfree(readState->tableMetadata); + list_free_deep(readState->datafileMetadata->stripeMetadataList); + pfree(readState->datafileMetadata); pfree(readState); } @@ -305,13 +305,13 @@ FreeBlockData(BlockData *blockData) uint64 CStoreTableRowCount(Relation relation) { - TableMetadata *tableMetadata = NULL; + DataFileMetadata *datafileMetadata = NULL; ListCell *stripeMetadataCell = NULL; uint64 totalRowCount = 0; - tableMetadata = ReadTableMetadata(relation->rd_node.relNode); + datafileMetadata = ReadDataFileMetadata(relation->rd_node.relNode); - foreach(stripeMetadataCell, tableMetadata->stripeMetadataList) + foreach(stripeMetadataCell, datafileMetadata->stripeMetadataList) { StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); totalRowCount += stripeMetadata->rowCount; diff --git a/cstore_tableam.c b/cstore_tableam.c index 6d02ebe24..4e7f22c31 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -443,12 +443,13 @@ cstore_relation_set_new_filenode(Relation rel, MultiXactId *minmulti) { SMgrRelation srel; + CStoreOptions *options = CStoreTableAMGetOptions(); Assert(persistence == RELPERSISTENCE_PERMANENT); *freezeXid = RecentXmin; *minmulti = GetOldestMultiXactId(); srel = RelationCreateStorage(*newrnode, persistence); - InitializeCStoreTableFile(newrnode->relNode, CStoreTableAMGetOptions()); + InitCStoreDataFileMetadata(newrnode->relNode, options->blockRowCount); smgrclose(srel); } @@ -686,7 +687,7 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId * tableam tables storage is managed by postgres. */ Relation rel = table_open(objectId, AccessExclusiveLock); - DeleteTableMetadataRowIfExists(rel->rd_node.relNode); + DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode); /* keep the lock since we did physical changes to the relation */ table_close(rel, NoLock); diff --git a/cstore_writer.c b/cstore_writer.c index 728c855b4..c70b448c7 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -45,7 +45,7 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode, int columnTypeLength, Oid columnCollation, FmgrInfo *comparisonFunction); static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); -static void AppendStripeMetadata(TableMetadata *tableMetadata, +static void AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata); static StringInfo CopyStringInfo(StringInfo sourceString); @@ -64,7 +64,7 @@ CStoreBeginWrite(Relation relation, TupleDesc tupleDescriptor) { TableWriteState *writeState = NULL; - TableMetadata *tableMetadata = NULL; + DataFileMetadata *datafileMetadata = NULL; FmgrInfo **comparisonFunctionArray = NULL; MemoryContext stripeWriteContext = NULL; uint64 currentFileOffset = 0; @@ -75,18 +75,18 @@ CStoreBeginWrite(Relation relation, uint64 currentStripeId = 0; Oid relNode = relation->rd_node.relNode; - tableMetadata = ReadTableMetadata(relNode); + datafileMetadata = ReadDataFileMetadata(relNode); /* * If stripeMetadataList is not empty, jump to the position right after * the last position. */ - if (tableMetadata->stripeMetadataList != NIL) + if (datafileMetadata->stripeMetadataList != NIL) { StripeMetadata *lastStripe = NULL; uint64 lastStripeSize = 0; - lastStripe = llast(tableMetadata->stripeMetadataList); + lastStripe = llast(datafileMetadata->stripeMetadataList); lastStripeSize += lastStripe->dataLength; currentFileOffset = lastStripe->fileOffset + lastStripeSize; @@ -129,7 +129,7 @@ CStoreBeginWrite(Relation relation, writeState = palloc0(sizeof(TableWriteState)); writeState->relation = relation; - writeState->tableMetadata = tableMetadata; + writeState->datafileMetadata = datafileMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->blockRowCount = blockRowCount; @@ -164,7 +164,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; - TableMetadata *tableMetadata = writeState->tableMetadata; + DataFileMetadata *datafileMetadata = writeState->datafileMetadata; const uint32 blockRowCount = writeState->blockRowCount; BlockData *blockData = writeState->blockData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); @@ -254,7 +254,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul MemoryContextSwitchTo(oldContext); InsertStripeMetadataRow(writeState->relation->rd_node.relNode, &stripeMetadata); - AppendStripeMetadata(tableMetadata, stripeMetadata); + AppendStripeMetadata(datafileMetadata, stripeMetadata); } else { @@ -284,11 +284,11 @@ CStoreEndWrite(TableWriteState *writeState) MemoryContextSwitchTo(oldContext); InsertStripeMetadataRow(writeState->relation->rd_node.relNode, &stripeMetadata); - AppendStripeMetadata(writeState->tableMetadata, stripeMetadata); + AppendStripeMetadata(writeState->datafileMetadata, stripeMetadata); } MemoryContextDelete(writeState->stripeWriteContext); - list_free_deep(writeState->tableMetadata->stripeMetadataList); + list_free_deep(writeState->datafileMetadata->stripeMetadataList); pfree(writeState->comparisonFunctionArray); FreeBlockData(writeState->blockData); pfree(writeState); @@ -791,13 +791,13 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength) * table footer's stripeMetadataList. */ static void -AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata) +AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata) { StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata)); memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata)); - tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList, - stripeMetadataCopy); + datafileMetadata->stripeMetadataList = lappend(datafileMetadata->stripeMetadataList, + stripeMetadataCopy); } diff --git a/expected/am_drop.out b/expected/am_drop.out index c1fc60519..26de328f6 100644 --- a/expected/am_drop.out +++ b/expected/am_drop.out @@ -12,12 +12,12 @@ -- 'postgres' directory is excluded from comparison to have the same result. -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset -- DROP cstore_fdw tables DROP TABLE contestant; DROP TABLE contestant_compressed; -- make sure DROP deletes metadata -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; ?column? ---------- 2 @@ -26,10 +26,10 @@ SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE TABLE test_schema.test_table(data int) USING cstore_tableam; -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset DROP SCHEMA test_schema CASCADE; NOTICE: drop cascades to table test_schema.test_table -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; ?column? ---------- 1 diff --git a/expected/fdw_drop.out b/expected/fdw_drop.out index 24c0f518d..e1ddf0fd0 100644 --- a/expected/fdw_drop.out +++ b/expected/fdw_drop.out @@ -12,12 +12,12 @@ -- 'postgres' directory is excluded from comparison to have the same result. -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset -- DROP cstore_fdw tables DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant_compressed; -- make sure DROP deletes metadata -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; ?column? ---------- 2 @@ -26,10 +26,10 @@ SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset DROP SCHEMA test_schema CASCADE; NOTICE: drop cascades to foreign table test_schema.test_table -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; ?column? ---------- 1 diff --git a/sql/am_drop.sql b/sql/am_drop.sql index 06873aa6e..080712881 100644 --- a/sql/am_drop.sql +++ b/sql/am_drop.sql @@ -15,22 +15,22 @@ -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset -- DROP cstore_fdw tables DROP TABLE contestant; DROP TABLE contestant_compressed; -- make sure DROP deletes metadata -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE TABLE test_schema.test_table(data int) USING cstore_tableam; -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset DROP SCHEMA test_schema CASCADE; -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; SELECT current_database() datname \gset diff --git a/sql/fdw_drop.sql b/sql/fdw_drop.sql index 7c6dd5c6e..f89374a5a 100644 --- a/sql/fdw_drop.sql +++ b/sql/fdw_drop.sql @@ -15,22 +15,22 @@ -- store postgres database oid SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset -- DROP cstore_fdw tables DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant_compressed; -- make sure DROP deletes metadata -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; -- Create a cstore_fdw table under a schema and drop it. CREATE SCHEMA test_schema; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; -SELECT count(*) AS cstore_tables_before_drop FROM cstore.cstore_tables \gset +SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset DROP SCHEMA test_schema CASCADE; -SELECT :cstore_tables_before_drop - count(*) FROM cstore.cstore_tables; +SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files; SELECT current_database() datname \gset