diff --git a/cstore.h b/cstore.h index f88ebbdab..96fa1ed53 100644 --- a/cstore.h +++ b/cstore.h @@ -77,6 +77,7 @@ typedef struct StripeMetadata { uint64 fileOffset; uint64 dataLength; + uint32 columnCount; uint32 blockCount; uint32 blockRowCount; uint64 rowCount; @@ -186,19 +187,6 @@ typedef struct StripeBuffers } StripeBuffers; -/* - * StripeFooter represents a stripe's footer. In this footer, we keep three - * arrays of sizes. The number of elements in each of the arrays is equal - * to the number of columns. - */ -typedef struct StripeFooter -{ - uint32 columnCount; - uint64 *existsSizeArray; - uint64 *valueSizeArray; -} StripeFooter; - - /* TableReadState represents state of a cstore file read operation. */ typedef struct TableReadState { @@ -235,7 +223,6 @@ typedef struct TableWriteState TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; uint64 currentFileOffset; - uint64 currentStripeOffset; Relation relation; MemoryContext stripeWriteContext; @@ -296,9 +283,6 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); /* cstore_metadata_tables.c */ -extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer); -extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount); - extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); extern TableMetadata * ReadTableMetadata(Oid relid); diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index d98652b6d..84b69be07 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -73,6 +73,7 @@ CREATE TABLE cstore_stripes ( stripe bigint NOT NULL, file_offset bigint NOT NULL, data_length bigint NOT NULL, + column_count int NOT NULL, block_count int NOT NULL, block_row_count int NOT NULL, row_count bigint NOT NULL, @@ -82,18 +83,6 @@ CREATE TABLE cstore_stripes ( COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata'; -CREATE TABLE cstore_stripe_attr ( - relid oid NOT NULL, - stripe bigint NOT NULL, - attr int NOT NULL, - exists_size bigint NOT NULL, - value_size bigint NOT NULL, - PRIMARY KEY (relid, stripe, attr), - FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED -) WITH (user_catalog_table = true); - -COMMENT ON TABLE cstore_tables IS 'CStore per stripe/column combination metadata'; - CREATE TABLE cstore_skipnodes ( relid oid NOT NULL, stripe bigint NOT NULL, @@ -108,7 +97,7 @@ CREATE TABLE cstore_skipnodes ( exists_stream_length bigint NOT NULL, value_compression_type int NOT NULL, PRIMARY KEY (relid, stripe, attr, block), - FOREIGN KEY (relid, stripe, attr) REFERENCES cstore_stripe_attr(relid, stripe, attr) ON DELETE CASCADE INITIALLY DEFERRED + FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); COMMENT ON TABLE cstore_tables IS 'CStore per block metadata'; diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 5285295b9..690e9eba9 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -43,8 +43,6 @@ typedef struct EState *estate; } ModifyState; -static Oid CStoreStripeAttrRelationId(void); -static Oid CStoreStripeAttrIndexRelationId(void); static Oid CStoreStripesRelationId(void); static Oid CStoreStripesIndexRelationId(void); static Oid CStoreTablesRelationId(void); @@ -63,14 +61,6 @@ static EState * create_estate_for_relation(Relation rel); static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm); static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); -/* constants for cstore_stripe_attr */ -#define Natts_cstore_stripe_attr 5 -#define Anum_cstore_stripe_attr_relid 1 -#define Anum_cstore_stripe_attr_stripe 2 -#define Anum_cstore_stripe_attr_attr 3 -#define Anum_cstore_stripe_attr_exists_size 4 -#define Anum_cstore_stripe_attr_value_size 5 - /* constants for cstore_table */ #define Natts_cstore_tables 4 #define Anum_cstore_tables_relid 1 @@ -79,14 +69,15 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); #define Anum_cstore_tables_version_minor 4 /* constants for cstore_stripe */ -#define Natts_cstore_stripes 7 +#define Natts_cstore_stripes 8 #define Anum_cstore_stripes_relid 1 #define Anum_cstore_stripes_stripe 2 #define Anum_cstore_stripes_file_offset 3 #define Anum_cstore_stripes_data_length 4 -#define Anum_cstore_stripes_block_count 5 -#define Anum_cstore_stripes_block_row_count 6 -#define Anum_cstore_stripes_row_count 7 +#define Anum_cstore_stripes_column_count 5 +#define Anum_cstore_stripes_block_count 6 +#define Anum_cstore_stripes_block_row_count 7 +#define Anum_cstore_stripes_row_count 8 /* constants for cstore_skipnodes */ #define Natts_cstore_skipnodes 12 @@ -328,6 +319,7 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) Int64GetDatum(stripe->id), Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->dataLength), + Int32GetDatum(stripe->columnCount), Int32GetDatum(stripe->blockCount), Int32GetDatum(stripe->blockRowCount), Int64GetDatum(stripe->rowCount) @@ -388,6 +380,8 @@ ReadTableMetadata(Oid relid) datumArray[Anum_cstore_stripes_file_offset - 1]); stripeMetadata->dataLength = DatumGetInt64( datumArray[Anum_cstore_stripes_data_length - 1]); + stripeMetadata->columnCount = DatumGetInt32( + datumArray[Anum_cstore_stripes_column_count - 1]); stripeMetadata->blockCount = DatumGetInt32( datumArray[Anum_cstore_stripes_block_count - 1]); stripeMetadata->blockRowCount = DatumGetInt32( @@ -485,103 +479,6 @@ DeleteTableMetadataRowIfExists(Oid relid) } -/* - * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. - */ -void -SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer) -{ - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); - Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock); - - ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs); - - for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) - { - bool nulls[Natts_cstore_stripe_attr] = { 0 }; - Datum values[Natts_cstore_stripe_attr] = { - ObjectIdGetDatum(relid), - Int64GetDatum(stripe), - Int16GetDatum(attr), - Int64GetDatum(footer->existsSizeArray[attr - 1]), - Int64GetDatum(footer->valueSizeArray[attr - 1]) - }; - - InsertTupleAndEnforceConstraints(modifyState, values, nulls); - } - - FinishModifyRelation(modifyState); - heap_close(cstoreStripeAttrs, NoLock); -} - - -/* - * ReadStripeFooter returns a StripeFooter by reading relevant records from - * cstore_stripe_attr. - */ -StripeFooter * -ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) -{ - StripeFooter *footer = NULL; - HeapTuple heapTuple; - - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); - Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, AccessShareLock); - Relation index = index_open(CStoreStripeAttrIndexRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); - - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[2]; - ScanKeyInit(&scanKey[0], Anum_cstore_stripe_attr_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); - ScanKeyInit(&scanKey[1], Anum_cstore_stripe_attr_stripe, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(stripe)); - - scanDescriptor = systable_beginscan_ordered(cstoreStripeAttrs, index, NULL, 2, - scanKey); - - footer = palloc0(sizeof(StripeFooter)); - footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64)); - footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64)); - - /* - * Stripe can have less columns than the relation if ALTER TABLE happens - * after stripe is formed. So we calculate column count of a stripe as - * maximum attribute number for that stripe. - */ - footer->columnCount = 0; - - while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) - { - Datum datumArray[Natts_cstore_stripe_attr]; - bool isNullArray[Natts_cstore_stripe_attr]; - AttrNumber attr = 0; - - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - attr = DatumGetInt16(datumArray[2]); - - footer->columnCount = Max(footer->columnCount, attr); - - while (attr > relationColumnCount) - { - ereport(ERROR, (errmsg("unexpected attribute %d for a relation with %d attrs", - attr, relationColumnCount))); - } - - footer->existsSizeArray[attr - 1] = - DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]); - footer->valueSizeArray[attr - 1] = - DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]); - } - - systable_endscan_ordered(scanDescriptor); - index_close(index, NoLock); - heap_close(cstoreStripeAttrs, NoLock); - - return footer; -} - - /* * StartModifyRelation allocates resources for modifications. */ @@ -760,28 +657,6 @@ ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm) } -/* - * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr. - * TODO: should we cache this similar to citus? - */ -static Oid -CStoreStripeAttrRelationId(void) -{ - return get_relname_relid("cstore_stripe_attr", CStoreNamespaceId()); -} - - -/* - * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_pkey. - * TODO: should we cache this similar to citus? - */ -static Oid -CStoreStripeAttrIndexRelationId(void) -{ - return get_relname_relid("cstore_stripe_attr_pkey", CStoreNamespaceId()); -} - - /* * CStoreStripesRelationId returns relation id of cstore_stripes. * TODO: should we cache this similar to citus? diff --git a/cstore_reader.c b/cstore_reader.c index 6b5d7ed00..25702b272 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -39,7 +39,6 @@ /* static function declarations */ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, - StripeFooter *stripeFooter, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList); @@ -141,7 +140,6 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu StripeMetadata *stripeMetadata = NULL; List *stripeMetadataList = readState->tableMetadata->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); - StripeFooter *stripeFooter = NULL; /* if we have read all stripes, return false */ if (readState->readStripeCount == stripeCount) @@ -154,12 +152,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu readState->blockData = NULL; stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); - stripeFooter = ReadStripeFooter(readState->relationId, - stripeMetadata->id, - readState->tupleDescriptor->natts); stripeBuffers = LoadFilteredStripeBuffers(readState->relation, stripeMetadata, - stripeFooter, readState->tupleDescriptor, readState->projectedColumnList, readState->whereClauseList); @@ -333,12 +327,11 @@ CStoreTableRowCount(Relation relation) */ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, - StripeFooter *stripeFooter, TupleDesc tupleDescriptor, - List *projectedColumnList, List *whereClauseList) + TupleDesc tupleDescriptor, List *projectedColumnList, + List *whereClauseList) { StripeBuffers *stripeBuffers = NULL; ColumnBuffers **columnBuffersArray = NULL; - uint64 currentColumnFileOffset = 0; uint32 columnIndex = 0; uint32 columnCount = tupleDescriptor->natts; @@ -358,13 +351,9 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, /* load column data for projected columns */ columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *)); - currentColumnFileOffset = stripeMetadata->fileOffset; - for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++) + for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++) { - uint64 existsSize = stripeFooter->existsSizeArray[columnIndex]; - uint64 valueSize = stripeFooter->valueSizeArray[columnIndex]; - if (projectedColumnMask[columnIndex]) { ColumnBlockSkipNode *blockSkipNode = @@ -379,9 +368,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, columnBuffersArray[columnIndex] = columnBuffers; } - - currentColumnFileOffset += existsSize; - currentColumnFileOffset += valueSize; } stripeBuffers = palloc0(sizeof(StripeBuffers)); diff --git a/cstore_writer.c b/cstore_writer.c index 65871b511..91e73ffa8 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -34,7 +34,6 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount, uint32 columnCount); static StripeMetadata FlushStripe(TableWriteState *writeState); -static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, bool datumTypeByValue, int datumTypeLength, @@ -242,7 +241,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul MemoryContextReset(writeState->stripeWriteContext); writeState->currentStripeId++; - writeState->currentStripeOffset = 0; /* set stripe data and skip list to NULL so they are recreated next time */ writeState->stripeBuffers = NULL; @@ -450,8 +448,6 @@ static StripeMetadata FlushStripe(TableWriteState *writeState) { StripeMetadata stripeMetadata = { 0 }; - uint64 dataLength = 0; - StripeFooter *stripeFooter = NULL; uint32 columnIndex = 0; uint32 blockIndex = 0; StripeBuffers *stripeBuffers = writeState->stripeBuffers; @@ -464,6 +460,7 @@ FlushStripe(TableWriteState *writeState) uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint64 initialFileOffset = writeState->currentFileOffset; + uint64 stripeSize = 0; /* * check if the last block needs serialization , the last block was not serialized @@ -487,16 +484,10 @@ FlushStripe(TableWriteState *writeState) uint64 existsBufferSize = blockBuffers->existsBuffer->len; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - blockSkipNode->existsBlockOffset = writeState->currentStripeOffset; + blockSkipNode->existsBlockOffset = stripeSize; blockSkipNode->existsLength = existsBufferSize; - writeState->currentStripeOffset += existsBufferSize; + stripeSize += existsBufferSize; } - } - - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex]; - ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; for (blockIndex = 0; blockIndex < blockCount; blockIndex++) { @@ -506,20 +497,14 @@ FlushStripe(TableWriteState *writeState) CompressionType valueCompressionType = blockBuffers->valueCompressionType; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - blockSkipNode->valueBlockOffset = writeState->currentStripeOffset; + blockSkipNode->valueBlockOffset = stripeSize; blockSkipNode->valueLength = valueBufferSize; blockSkipNode->valueCompressionType = valueCompressionType; - writeState->currentStripeOffset += valueBufferSize; + stripeSize += valueBufferSize; } } - - /* create skip list and footer buffers */ - SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, - stripeSkipList, tupleDescriptor); - stripeFooter = CreateStripeFooter(stripeSkipList); - /* * Each stripe has only one section: * Data section, in which we store data for each column continuously. @@ -557,17 +542,9 @@ FlushStripe(TableWriteState *writeState) } } - /* finally, we flush the footer buffer */ - SaveStripeFooter(writeState->relationId, - writeState->currentStripeId, - stripeFooter); - - /* set stripe metadata */ - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - dataLength += stripeFooter->existsSizeArray[columnIndex]; - dataLength += stripeFooter->valueSizeArray[columnIndex]; - } + /* create skip list and footer buffers */ + SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, + stripeSkipList, tupleDescriptor); for (blockIndex = 0; blockIndex < blockCount; blockIndex++) { @@ -576,47 +553,16 @@ FlushStripe(TableWriteState *writeState) } stripeMetadata.fileOffset = initialFileOffset; - stripeMetadata.dataLength = dataLength; + stripeMetadata.dataLength = stripeSize; stripeMetadata.id = writeState->currentStripeId; stripeMetadata.blockCount = blockCount; stripeMetadata.blockRowCount = writeState->blockRowCount; + stripeMetadata.columnCount = columnCount; return stripeMetadata; } -/* Creates and returns the footer for given stripe. */ -static StripeFooter * -CreateStripeFooter(StripeSkipList *stripeSkipList) -{ - StripeFooter *stripeFooter = NULL; - uint32 columnIndex = 0; - uint32 columnCount = stripeSkipList->columnCount; - uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64)); - uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64)); - - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - ColumnBlockSkipNode *blockSkipNodeArray = - stripeSkipList->blockSkipNodeArray[columnIndex]; - uint32 blockIndex = 0; - - for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++) - { - existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength; - valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength; - } - } - - stripeFooter = palloc0(sizeof(StripeFooter)); - stripeFooter->columnCount = columnCount; - stripeFooter->existsSizeArray = existsSizeArray; - stripeFooter->valueSizeArray = valueSizeArray; - - return stripeFooter; -} - - /* * SerializeBoolArray serializes the given boolean array and returns the result * as a StringInfo. This function packs every 8 boolean values into one byte.