diff --git a/.gitignore b/.gitignore index 0c643e590..aa7be0e36 100644 --- a/.gitignore +++ b/.gitignore @@ -42,17 +42,17 @@ /data/*.cstore /data/*.footer -/sql/block_filtering.sql -/sql/copyto.sql -/sql/create.sql -/sql/data_types.sql -/sql/load.sql +/sql/*block_filtering.sql +/sql/*copyto.sql +/sql/*create.sql +/sql/*data_types.sql +/sql/*load.sql -/expected/block_filtering.out -/expected/copyto.out -/expected/create.out -/expected/data_types.out -/expected/load.out +/expected/*block_filtering.out +/expected/*copyto.out +/expected/*create.out +/expected/*data_types.out +/expected/*load.out /results/* /.deps/* /regression.diffs diff --git a/cstore.h b/cstore.h index ad0ad20bd..96fa1ed53 100644 --- a/cstore.h +++ b/cstore.h @@ -77,7 +77,9 @@ typedef struct StripeMetadata { uint64 fileOffset; uint64 dataLength; + uint32 columnCount; uint32 blockCount; + uint32 blockRowCount; uint64 rowCount; uint64 id; } StripeMetadata; @@ -128,20 +130,27 @@ typedef struct StripeSkipList /* - * ColumnBlockData represents a block of data in a column. valueArray stores + * BlockData represents a block of data for multiple columns. valueArray stores * the values of data, and existsArray stores whether a value is present. * valueBuffer is used to store (uncompressed) serialized values * referenced by Datum's in valueArray. It is only used for by-reference Datum's. * There is a one-to-one correspondence between valueArray and existsArray. */ -typedef struct ColumnBlockData +typedef struct BlockData { - bool *existsArray; - Datum *valueArray; + uint32 rowCount; + uint32 columnCount; + + /* + * Following are indexed by [column][row]. If a column is not projected, + * then existsArray[column] and valueArray[column] are NULL. + */ + bool **existsArray; + Datum **valueArray; /* valueBuffer keeps actual data for type-by-reference datums from valueArray. */ - StringInfo valueBuffer; -} ColumnBlockData; + StringInfo *valueBufferArray; +} BlockData; /* @@ -178,25 +187,13 @@ typedef struct StripeBuffers } StripeBuffers; -/* - * StripeFooter represents a stripe's footer. In this footer, we keep three - * arrays of sizes. The number of elements in each of the arrays is equal - * to the number of columns. - */ -typedef struct StripeFooter -{ - uint32 columnCount; - uint64 *existsSizeArray; - uint64 *valueSizeArray; -} StripeFooter; - - /* TableReadState represents state of a cstore file read operation. */ typedef struct TableReadState { Oid relationId; TableMetadata *tableMetadata; + StripeMetadata *currentStripeMetadata; TupleDesc tupleDescriptor; Relation relation; @@ -212,7 +209,7 @@ typedef struct TableReadState StripeBuffers *stripeBuffers; uint32 readStripeCount; uint64 stripeReadRowCount; - ColumnBlockData **blockDataArray; + BlockData *blockData; int32 deserializedBlockIndex; } TableReadState; @@ -233,7 +230,8 @@ typedef struct TableWriteState StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; uint32 stripeMaxRowCount; - ColumnBlockData **blockDataArray; + uint32 blockRowCount; + BlockData *blockData; /* * compressionBuffer buffer is used as temporary storage during @@ -276,19 +274,15 @@ extern void CStoreEndRead(TableReadState *state); /* Function declarations for common functions */ extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId); -extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, - uint32 blockRowCount); -extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, - uint32 columnCount); +extern BlockData * CreateEmptyBlockData(uint32 columnCount, bool *columnMask, + uint32 blockRowCount); +extern void FreeBlockData(BlockData *blockData); extern uint64 CStoreTableRowCount(Relation relation); extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); /* cstore_metadata_tables.c */ -extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer); -extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount); - extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); extern TableMetadata * ReadTableMetadata(Oid relid); diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index 19801f1f8..84b69be07 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -73,7 +73,9 @@ CREATE TABLE cstore_stripes ( stripe bigint NOT NULL, file_offset bigint NOT NULL, data_length bigint NOT NULL, + column_count int NOT NULL, block_count int NOT NULL, + block_row_count int NOT NULL, row_count bigint NOT NULL, PRIMARY KEY (relid, stripe), FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED @@ -81,18 +83,6 @@ CREATE TABLE cstore_stripes ( COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata'; -CREATE TABLE cstore_stripe_attr ( - relid oid NOT NULL, - stripe bigint NOT NULL, - attr int NOT NULL, - exists_size bigint NOT NULL, - value_size bigint NOT NULL, - PRIMARY KEY (relid, stripe, attr), - FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED -) WITH (user_catalog_table = true); - -COMMENT ON TABLE cstore_tables IS 'CStore per stripe/column combination metadata'; - CREATE TABLE cstore_skipnodes ( relid oid NOT NULL, stripe bigint NOT NULL, @@ -107,7 +97,7 @@ CREATE TABLE cstore_skipnodes ( exists_stream_length bigint NOT NULL, value_compression_type int NOT NULL, PRIMARY KEY (relid, stripe, attr, block), - FOREIGN KEY (relid, stripe, attr) REFERENCES cstore_stripe_attr(relid, stripe, attr) ON DELETE CASCADE INITIALLY DEFERRED + FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); COMMENT ON TABLE cstore_tables IS 'CStore per block metadata'; diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 8a67a3a9e..690e9eba9 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -43,8 +43,6 @@ typedef struct EState *estate; } ModifyState; -static Oid CStoreStripeAttrRelationId(void); -static Oid CStoreStripeAttrIndexRelationId(void); static Oid CStoreStripesRelationId(void); static Oid CStoreStripesIndexRelationId(void); static Oid CStoreTablesRelationId(void); @@ -63,14 +61,6 @@ static EState * create_estate_for_relation(Relation rel); static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm); static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); -/* constants for cstore_stripe_attr */ -#define Natts_cstore_stripe_attr 5 -#define Anum_cstore_stripe_attr_relid 1 -#define Anum_cstore_stripe_attr_stripe 2 -#define Anum_cstore_stripe_attr_attr 3 -#define Anum_cstore_stripe_attr_exists_size 4 -#define Anum_cstore_stripe_attr_value_size 5 - /* constants for cstore_table */ #define Natts_cstore_tables 4 #define Anum_cstore_tables_relid 1 @@ -79,13 +69,15 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); #define Anum_cstore_tables_version_minor 4 /* constants for cstore_stripe */ -#define Natts_cstore_stripes 6 +#define Natts_cstore_stripes 8 #define Anum_cstore_stripes_relid 1 #define Anum_cstore_stripes_stripe 2 #define Anum_cstore_stripes_file_offset 3 #define Anum_cstore_stripes_data_length 4 -#define Anum_cstore_stripes_block_count 5 -#define Anum_cstore_stripes_row_count 6 +#define Anum_cstore_stripes_column_count 5 +#define Anum_cstore_stripes_block_count 6 +#define Anum_cstore_stripes_block_row_count 7 +#define Anum_cstore_stripes_row_count 8 /* constants for cstore_skipnodes */ #define Natts_cstore_skipnodes 12 @@ -327,7 +319,9 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) Int64GetDatum(stripe->id), Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->dataLength), + Int32GetDatum(stripe->columnCount), Int32GetDatum(stripe->blockCount), + Int32GetDatum(stripe->blockRowCount), Int64GetDatum(stripe->rowCount) }; @@ -386,8 +380,12 @@ ReadTableMetadata(Oid relid) datumArray[Anum_cstore_stripes_file_offset - 1]); stripeMetadata->dataLength = DatumGetInt64( datumArray[Anum_cstore_stripes_data_length - 1]); + stripeMetadata->columnCount = DatumGetInt32( + datumArray[Anum_cstore_stripes_column_count - 1]); stripeMetadata->blockCount = DatumGetInt32( datumArray[Anum_cstore_stripes_block_count - 1]); + stripeMetadata->blockRowCount = DatumGetInt32( + datumArray[Anum_cstore_stripes_block_row_count - 1]); stripeMetadata->rowCount = DatumGetInt64( datumArray[Anum_cstore_stripes_row_count - 1]); @@ -481,103 +479,6 @@ DeleteTableMetadataRowIfExists(Oid relid) } -/* - * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. - */ -void -SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer) -{ - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); - Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock); - - ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs); - - for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) - { - bool nulls[Natts_cstore_stripe_attr] = { 0 }; - Datum values[Natts_cstore_stripe_attr] = { - ObjectIdGetDatum(relid), - Int64GetDatum(stripe), - Int16GetDatum(attr), - Int64GetDatum(footer->existsSizeArray[attr - 1]), - Int64GetDatum(footer->valueSizeArray[attr - 1]) - }; - - InsertTupleAndEnforceConstraints(modifyState, values, nulls); - } - - FinishModifyRelation(modifyState); - heap_close(cstoreStripeAttrs, NoLock); -} - - -/* - * ReadStripeFooter returns a StripeFooter by reading relevant records from - * cstore_stripe_attr. - */ -StripeFooter * -ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) -{ - StripeFooter *footer = NULL; - HeapTuple heapTuple; - - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); - Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, AccessShareLock); - Relation index = index_open(CStoreStripeAttrIndexRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); - - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[2]; - ScanKeyInit(&scanKey[0], Anum_cstore_stripe_attr_relid, - BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); - ScanKeyInit(&scanKey[1], Anum_cstore_stripe_attr_stripe, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(stripe)); - - scanDescriptor = systable_beginscan_ordered(cstoreStripeAttrs, index, NULL, 2, - scanKey); - - footer = palloc0(sizeof(StripeFooter)); - footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64)); - footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64)); - - /* - * Stripe can have less columns than the relation if ALTER TABLE happens - * after stripe is formed. So we calculate column count of a stripe as - * maximum attribute number for that stripe. - */ - footer->columnCount = 0; - - while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) - { - Datum datumArray[Natts_cstore_stripe_attr]; - bool isNullArray[Natts_cstore_stripe_attr]; - AttrNumber attr = 0; - - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - attr = DatumGetInt16(datumArray[2]); - - footer->columnCount = Max(footer->columnCount, attr); - - while (attr > relationColumnCount) - { - ereport(ERROR, (errmsg("unexpected attribute %d for a relation with %d attrs", - attr, relationColumnCount))); - } - - footer->existsSizeArray[attr - 1] = - DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]); - footer->valueSizeArray[attr - 1] = - DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]); - } - - systable_endscan_ordered(scanDescriptor); - index_close(index, NoLock); - heap_close(cstoreStripeAttrs, NoLock); - - return footer; -} - - /* * StartModifyRelation allocates resources for modifications. */ @@ -756,28 +657,6 @@ ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm) } -/* - * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr. - * TODO: should we cache this similar to citus? - */ -static Oid -CStoreStripeAttrRelationId(void) -{ - return get_relname_relid("cstore_stripe_attr", CStoreNamespaceId()); -} - - -/* - * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_pkey. - * TODO: should we cache this similar to citus? - */ -static Oid -CStoreStripeAttrIndexRelationId(void) -{ - return get_relname_relid("cstore_stripe_attr_pkey", CStoreNamespaceId()); -} - - /* * CStoreStripesRelationId returns relation id of cstore_stripes. * TODO: should we cache this similar to citus? diff --git a/cstore_reader.c b/cstore_reader.c index fecb45605..25702b272 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -39,18 +39,16 @@ /* static function declarations */ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, - StripeFooter *stripeFooter, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList); static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, uint64 blockIndex, uint64 blockRowIndex, - ColumnBlockData **blockDataArray, - Datum *columnValues, bool *columnNulls); + BlockData *blockData, Datum *columnValues, + bool *columnNulls); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, - uint32 blockCount, uint64 existsFileOffset, - uint64 valueFileOffset, + uint32 blockCount, uint64 stripeOffset, Form_pg_attribute attributeForm); static bool * SelectedBlockMask(StripeSkipList *stripeSkipList, List *projectedColumnList, List *whereClauseList); @@ -70,15 +68,12 @@ static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCount, bool datumTypeByValue, int datumTypeLength, char datumTypeAlign, Datum *datumArray); -static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, - uint32 rowCount, ColumnBlockData **blockDataArray, - TupleDesc tupleDescriptor); +static BlockData * DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, + uint32 rowCount, TupleDesc tupleDescriptor, + List *projectedColumnList); static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm); static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size); -static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, - uint32 columnCount); - /* * CStoreBeginRead initializes a cstore read operation. This function returns a @@ -91,9 +86,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, TableReadState *readState = NULL; TableMetadata *tableMetadata = NULL; MemoryContext stripeReadContext = NULL; - uint32 columnCount = 0; - bool *projectedColumnMask = NULL; - ColumnBlockData **blockDataArray = NULL; tableMetadata = ReadTableMetadata(relationId); @@ -106,11 +98,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, "Stripe Read Memory Context", ALLOCSET_DEFAULT_SIZES); - columnCount = tupleDescriptor->natts; - projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); - blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask, - tableMetadata->blockRowCount); - readState = palloc0(sizeof(TableReadState)); readState->relationId = relationId; readState->tableMetadata = tableMetadata; @@ -121,7 +108,7 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, readState->stripeReadRowCount = 0; readState->tupleDescriptor = tupleDescriptor; readState->stripeReadContext = stripeReadContext; - readState->blockDataArray = blockDataArray; + readState->blockData = NULL; readState->deserializedBlockIndex = -1; return readState; @@ -138,7 +125,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { uint32 blockIndex = 0; uint32 blockRowIndex = 0; - TableMetadata *tableMetadata = readState->tableMetadata; + StripeMetadata *stripeMetadata = readState->currentStripeMetadata; MemoryContext oldContext = NULL; /* @@ -151,9 +138,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { StripeBuffers *stripeBuffers = NULL; StripeMetadata *stripeMetadata = NULL; - List *stripeMetadataList = tableMetadata->stripeMetadataList; + List *stripeMetadataList = readState->tableMetadata->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); - StripeFooter *stripeFooter = NULL; /* if we have read all stripes, return false */ if (readState->readStripeCount == stripeCount) @@ -163,18 +149,16 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu oldContext = MemoryContextSwitchTo(readState->stripeReadContext); MemoryContextReset(readState->stripeReadContext); + readState->blockData = NULL; stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); - stripeFooter = ReadStripeFooter(readState->relationId, - stripeMetadata->id, - readState->tupleDescriptor->natts); stripeBuffers = LoadFilteredStripeBuffers(readState->relation, stripeMetadata, - stripeFooter, readState->tupleDescriptor, readState->projectedColumnList, readState->whereClauseList); readState->readStripeCount++; + readState->currentStripeMetadata = stripeMetadata; MemoryContextSwitchTo(oldContext); @@ -183,37 +167,38 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu readState->stripeBuffers = stripeBuffers; readState->stripeReadRowCount = 0; readState->deserializedBlockIndex = -1; - ResetUncompressedBlockData(readState->blockDataArray, - stripeBuffers->columnCount); break; } } - blockIndex = readState->stripeReadRowCount / tableMetadata->blockRowCount; - blockRowIndex = readState->stripeReadRowCount % tableMetadata->blockRowCount; + blockIndex = readState->stripeReadRowCount / stripeMetadata->blockRowCount; + blockRowIndex = readState->stripeReadRowCount % stripeMetadata->blockRowCount; if (blockIndex != readState->deserializedBlockIndex) { uint32 lastBlockIndex = 0; uint32 blockRowCount = 0; uint32 stripeRowCount = 0; + StripeMetadata *stripeMetadata = readState->currentStripeMetadata; - stripeRowCount = readState->stripeBuffers->rowCount; - lastBlockIndex = stripeRowCount / tableMetadata->blockRowCount; + stripeRowCount = stripeMetadata->rowCount; + lastBlockIndex = stripeRowCount / stripeMetadata->blockRowCount; if (blockIndex == lastBlockIndex) { - blockRowCount = stripeRowCount % tableMetadata->blockRowCount; + blockRowCount = stripeRowCount % stripeMetadata->blockRowCount; } else { - blockRowCount = tableMetadata->blockRowCount; + blockRowCount = stripeMetadata->blockRowCount; } oldContext = MemoryContextSwitchTo(readState->stripeReadContext); - DeserializeBlockData(readState->stripeBuffers, blockIndex, - blockRowCount, readState->blockDataArray, - readState->tupleDescriptor); + FreeBlockData(readState->blockData); + readState->blockData = + DeserializeBlockData(readState->stripeBuffers, blockIndex, + blockRowCount, readState->tupleDescriptor, + readState->projectedColumnList); MemoryContextSwitchTo(oldContext); @@ -221,7 +206,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu } ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList, - blockIndex, blockRowIndex, readState->blockDataArray, + blockIndex, blockRowIndex, readState->blockData, columnValues, columnNulls); /* @@ -242,11 +227,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu void CStoreEndRead(TableReadState *readState) { - int columnCount = readState->tupleDescriptor->natts; - MemoryContextDelete(readState->stripeReadContext); list_free_deep(readState->tableMetadata->stripeMetadataList); - FreeColumnBlockDataArray(readState->blockDataArray, columnCount); pfree(readState->tableMetadata); pfree(readState); } @@ -256,52 +238,65 @@ CStoreEndRead(TableReadState *readState) * CreateEmptyBlockDataArray creates data buffers to keep deserialized exist and * value arrays for requested columns in columnMask. */ -ColumnBlockData ** -CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, uint32 blockRowCount) +BlockData * +CreateEmptyBlockData(uint32 columnCount, bool *columnMask, uint32 blockRowCount) { uint32 columnIndex = 0; - ColumnBlockData **blockDataArray = palloc0(columnCount * sizeof(ColumnBlockData *)); + + BlockData *blockData = palloc0(sizeof(BlockData)); + blockData->existsArray = palloc0(columnCount * sizeof(bool *)); + blockData->valueArray = palloc0(columnCount * sizeof(Datum *)); + blockData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo)); + blockData->columnCount = columnCount; + blockData->rowCount = blockRowCount; /* allocate block memory for deserialized data */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { if (columnMask[columnIndex]) { - ColumnBlockData *blockData = palloc0(sizeof(ColumnBlockData)); - - blockData->existsArray = palloc0(blockRowCount * sizeof(bool)); - blockData->valueArray = palloc0(blockRowCount * sizeof(Datum)); - blockData->valueBuffer = NULL; - blockDataArray[columnIndex] = blockData; + blockData->existsArray[columnIndex] = palloc0(blockRowCount * sizeof(bool)); + blockData->valueArray[columnIndex] = palloc0(blockRowCount * sizeof(Datum)); + blockData->valueBufferArray[columnIndex] = NULL; } } - return blockDataArray; + return blockData; } /* - * FreeColumnBlockDataArray deallocates data buffers to keep deserialized exist and + * FreeBlockData deallocates data buffers to keep deserialized exist and * value arrays for requested columns in columnMask. * ColumnBlockData->serializedValueBuffer lives in memory read/write context * so it is deallocated automatically when the context is deleted. */ void -FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) +FreeBlockData(BlockData *blockData) { uint32 columnIndex = 0; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + + if (blockData == NULL) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - if (blockData != NULL) + return; + } + + for (columnIndex = 0; columnIndex < blockData->columnCount; columnIndex++) + { + if (blockData->existsArray[columnIndex] != NULL) { - pfree(blockData->existsArray); - pfree(blockData->valueArray); - pfree(blockData); + pfree(blockData->existsArray[columnIndex]); + } + + if (blockData->valueArray[columnIndex] != NULL) + { + pfree(blockData->valueArray[columnIndex]); } } - pfree(blockDataArray); + pfree(blockData->existsArray); + pfree(blockData->valueArray); + pfree(blockData); } @@ -332,12 +327,11 @@ CStoreTableRowCount(Relation relation) */ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, - StripeFooter *stripeFooter, TupleDesc tupleDescriptor, - List *projectedColumnList, List *whereClauseList) + TupleDesc tupleDescriptor, List *projectedColumnList, + List *whereClauseList) { StripeBuffers *stripeBuffers = NULL; ColumnBuffers **columnBuffersArray = NULL; - uint64 currentColumnFileOffset = 0; uint32 columnIndex = 0; uint32 columnCount = tupleDescriptor->natts; @@ -357,15 +351,9 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, /* load column data for projected columns */ columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *)); - currentColumnFileOffset = stripeMetadata->fileOffset; - for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++) + for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++) { - uint64 existsSize = stripeFooter->existsSizeArray[columnIndex]; - uint64 valueSize = stripeFooter->valueSizeArray[columnIndex]; - uint64 existsFileOffset = currentColumnFileOffset; - uint64 valueFileOffset = currentColumnFileOffset + existsSize; - if (projectedColumnMask[columnIndex]) { ColumnBlockSkipNode *blockSkipNode = @@ -375,15 +363,11 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode, blockCount, - existsFileOffset, - valueFileOffset, + stripeMetadata->fileOffset, attributeForm); columnBuffersArray[columnIndex] = columnBuffers; } - - currentColumnFileOffset += existsSize; - currentColumnFileOffset += valueSize; } stripeBuffers = palloc0(sizeof(StripeBuffers)); @@ -403,7 +387,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, uint64 blockIndex, uint64 blockRowIndex, - ColumnBlockData **blockDataArray, Datum *columnValues, + BlockData *blockData, Datum *columnValues, bool *columnNulls) { ListCell *projectedColumnCell = NULL; @@ -414,13 +398,12 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, foreach(projectedColumnCell, projectedColumnList) { Var *projectedColumn = lfirst(projectedColumnCell); - uint32 projectedColumnIndex = projectedColumn->varattno - 1; - ColumnBlockData *blockData = blockDataArray[projectedColumnIndex]; + uint32 columnIndex = projectedColumn->varattno - 1; - if (blockData->existsArray[blockRowIndex]) + if (blockData->existsArray[columnIndex][blockRowIndex]) { - columnValues[projectedColumnIndex] = blockData->valueArray[blockRowIndex]; - columnNulls[projectedColumnIndex] = false; + columnValues[columnIndex] = blockData->valueArray[columnIndex][blockRowIndex]; + columnNulls[columnIndex] = false; } } } @@ -433,7 +416,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, */ static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, - uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset, + uint32 blockCount, uint64 stripeOffset, Form_pg_attribute attributeForm) { ColumnBuffers *columnBuffers = NULL; @@ -454,7 +437,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, for (blockIndex = 0; blockIndex < blockCount; blockIndex++) { ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset; + uint64 existsOffset = stripeOffset + blockSkipNode->existsBlockOffset; StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset, blockSkipNode->existsLength); @@ -466,7 +449,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, { ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; CompressionType compressionType = blockSkipNode->valueCompressionType; - uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset; + uint64 valueOffset = stripeOffset + blockSkipNode->valueBlockOffset; StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset, blockSkipNode->valueLength); @@ -919,20 +902,23 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou * data is not present serialized buffer, then default value (or null) is used * to fill value array. */ -static void +static BlockData * DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, - uint32 rowCount, - ColumnBlockData **blockDataArray, TupleDesc tupleDescriptor) + uint32 rowCount, TupleDesc tupleDescriptor, + List *projectedColumnList) { int columnIndex = 0; + bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList); + BlockData *blockData = CreateEmptyBlockData(tupleDescriptor->natts, columnMask, + rowCount); + for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex); ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; bool columnAdded = false; - if ((columnBuffers == NULL) && (blockData != NULL)) + if (columnBuffers == NULL && columnMask[columnIndex]) { columnAdded = true; } @@ -943,10 +929,6 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, columnBuffers->blockBuffersArray[blockIndex]; StringInfo valueBuffer = NULL; - /* free previous block's data buffers */ - pfree(blockData->valueBuffer->data); - pfree(blockData->valueBuffer); - /* decompress and deserialize current block's data */ valueBuffer = DecompressBuffer(blockBuffers->valueBuffer, blockBuffers->valueCompressionType); @@ -958,15 +940,16 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, pfree(blockBuffers->valueBuffer); } - DeserializeBoolArray(blockBuffers->existsBuffer, blockData->existsArray, + DeserializeBoolArray(blockBuffers->existsBuffer, + blockData->existsArray[columnIndex], rowCount); - DeserializeDatumArray(valueBuffer, blockData->existsArray, + DeserializeDatumArray(valueBuffer, blockData->existsArray[columnIndex], rowCount, attributeForm->attbyval, attributeForm->attlen, attributeForm->attalign, - blockData->valueArray); + blockData->valueArray[columnIndex]); /* store current block's data buffer to be freed at next block read */ - blockData->valueBuffer = valueBuffer; + blockData->valueBufferArray[columnIndex] = valueBuffer; } else if (columnAdded) { @@ -983,16 +966,19 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { - blockData->existsArray[rowIndex] = true; - blockData->valueArray[rowIndex] = defaultValue; + blockData->existsArray[columnIndex][rowIndex] = true; + blockData->valueArray[columnIndex][rowIndex] = defaultValue; } } else { - memset(blockData->existsArray, false, rowCount); + memset(blockData->existsArray[columnIndex], false, + rowCount * sizeof(bool)); } } } + + return blockData; } @@ -1067,23 +1053,3 @@ ReadFromSmgr(Relation rel, uint64 offset, uint32 size) return resultBuffer; } - - -/* - * ResetUncompressedBlockData iterates over deserialized column block data - * and sets valueBuffer field to empty buffer. This field is allocated in stripe - * memory context and becomes invalid once memory context is reset. - */ -static void -ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount) -{ - uint32 columnIndex = 0; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - if (blockData != NULL) - { - blockData->valueBuffer = makeStringInfo(); - } - } -} diff --git a/cstore_writer.c b/cstore_writer.c index 55a314ec4..91e73ffa8 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -34,7 +34,6 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount, uint32 columnCount); static StripeMetadata FlushStripe(TableWriteState *writeState); -static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, bool datumTypeByValue, int datumTypeLength, @@ -72,7 +71,7 @@ CStoreBeginWrite(Oid relationId, uint32 columnCount = 0; uint32 columnIndex = 0; bool *columnMaskArray = NULL; - ColumnBlockData **blockData = NULL; + BlockData *blockData = NULL; uint64 currentStripeId = 0; tableMetadata = ReadTableMetadata(relationId); @@ -125,20 +124,21 @@ CStoreBeginWrite(Oid relationId, columnMaskArray = palloc(columnCount * sizeof(bool)); memset(columnMaskArray, true, columnCount); - blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount); + blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount); writeState = palloc0(sizeof(TableWriteState)); writeState->relationId = relationId; writeState->tableMetadata = tableMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; + writeState->blockRowCount = blockRowCount; writeState->tupleDescriptor = tupleDescriptor; writeState->currentFileOffset = currentFileOffset; writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; writeState->stripeWriteContext = stripeWriteContext; - writeState->blockDataArray = blockData; + writeState->blockData = blockData; writeState->compressionBuffer = NULL; writeState->currentStripeId = currentStripeId; @@ -164,8 +164,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; TableMetadata *tableMetadata = writeState->tableMetadata; - const uint32 blockRowCount = tableMetadata->blockRowCount; - ColumnBlockData **blockDataArray = writeState->blockDataArray; + const uint32 blockRowCount = writeState->blockRowCount; + BlockData *blockData = writeState->blockData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); if (stripeBuffers == NULL) @@ -184,8 +184,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - blockData->valueBuffer = makeStringInfo(); + blockData->valueBufferArray[columnIndex] = makeStringInfo(); } } @@ -194,14 +193,13 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; ColumnBlockSkipNode **blockSkipNodeArray = stripeSkipList->blockSkipNodeArray; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[columnIndex][blockIndex]; if (columnNulls[columnIndex]) { - blockData->existsArray[blockRowIndex] = false; + blockData->existsArray[columnIndex][blockRowIndex] = false; } else { @@ -214,10 +212,11 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul Oid columnCollation = attributeForm->attcollation; char columnTypeAlign = attributeForm->attalign; - blockData->existsArray[blockRowIndex] = true; + blockData->existsArray[columnIndex][blockRowIndex] = true; - SerializeSingleDatum(blockData->valueBuffer, columnValues[columnIndex], - columnTypeByValue, columnTypeLength, columnTypeAlign); + SerializeSingleDatum(blockData->valueBufferArray[columnIndex], + columnValues[columnIndex], columnTypeByValue, + columnTypeLength, columnTypeAlign); UpdateBlockSkipNodeMinMax(blockSkipNode, columnValues[columnIndex], columnTypeByValue, columnTypeLength, @@ -271,7 +270,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul void CStoreEndWrite(TableWriteState *writeState) { - int columnCount = writeState->tupleDescriptor->natts; StripeBuffers *stripeBuffers = writeState->stripeBuffers; if (stripeBuffers != NULL) @@ -289,7 +287,7 @@ CStoreEndWrite(TableWriteState *writeState) MemoryContextDelete(writeState->stripeWriteContext); list_free_deep(writeState->tableMetadata->stripeMetadataList); pfree(writeState->comparisonFunctionArray); - FreeColumnBlockDataArray(writeState->blockDataArray, columnCount); + FreeBlockData(writeState->blockData); pfree(writeState); } @@ -415,6 +413,8 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) if (RelationNeedsWAL(rel)) { + XLogRecPtr recptr = 0; + XLogBeginInsert(); /* @@ -423,7 +423,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) */ XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE); - XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0); + recptr = XLogInsert(RM_GENERIC_ID, 0); PageSetLSN(page, recptr); } @@ -448,21 +448,19 @@ static StripeMetadata FlushStripe(TableWriteState *writeState) { StripeMetadata stripeMetadata = { 0 }; - uint64 dataLength = 0; - StripeFooter *stripeFooter = NULL; uint32 columnIndex = 0; uint32 blockIndex = 0; - TableMetadata *tableMetadata = writeState->tableMetadata; StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray; TupleDesc tupleDescriptor = writeState->tupleDescriptor; uint32 columnCount = tupleDescriptor->natts; uint32 blockCount = stripeSkipList->blockCount; - uint32 blockRowCount = tableMetadata->blockRowCount; + uint32 blockRowCount = writeState->blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint64 initialFileOffset = writeState->currentFileOffset; + uint64 stripeSize = 0; /* * check if the last block needs serialization , the last block was not serialized @@ -473,12 +471,10 @@ FlushStripe(TableWriteState *writeState) SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount); } - /* update buffer sizes and positions in stripe skip list */ + /* update buffer sizes in stripe skip list */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex]; - uint64 currentExistsBlockOffset = 0; - uint64 currentValueBlockOffset = 0; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; for (blockIndex = 0; blockIndex < blockCount; blockIndex++) @@ -486,26 +482,29 @@ FlushStripe(TableWriteState *writeState) ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; uint64 existsBufferSize = blockBuffers->existsBuffer->len; + ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; + + blockSkipNode->existsBlockOffset = stripeSize; + blockSkipNode->existsLength = existsBufferSize; + stripeSize += existsBufferSize; + } + + for (blockIndex = 0; blockIndex < blockCount; blockIndex++) + { + ColumnBlockBuffers *blockBuffers = + columnBuffers->blockBuffersArray[blockIndex]; uint64 valueBufferSize = blockBuffers->valueBuffer->len; CompressionType valueCompressionType = blockBuffers->valueCompressionType; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - blockSkipNode->existsBlockOffset = currentExistsBlockOffset; - blockSkipNode->existsLength = existsBufferSize; - blockSkipNode->valueBlockOffset = currentValueBlockOffset; + blockSkipNode->valueBlockOffset = stripeSize; blockSkipNode->valueLength = valueBufferSize; blockSkipNode->valueCompressionType = valueCompressionType; - currentExistsBlockOffset += existsBufferSize; - currentValueBlockOffset += valueBufferSize; + stripeSize += valueBufferSize; } } - /* create skip list and footer buffers */ - SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, - stripeSkipList, tupleDescriptor); - stripeFooter = CreateStripeFooter(stripeSkipList); - /* * Each stripe has only one section: * Data section, in which we store data for each column continuously. @@ -543,17 +542,9 @@ FlushStripe(TableWriteState *writeState) } } - /* finally, we flush the footer buffer */ - SaveStripeFooter(writeState->relationId, - writeState->currentStripeId, - stripeFooter); - - /* set stripe metadata */ - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - dataLength += stripeFooter->existsSizeArray[columnIndex]; - dataLength += stripeFooter->valueSizeArray[columnIndex]; - } + /* create skip list and footer buffers */ + SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, + stripeSkipList, tupleDescriptor); for (blockIndex = 0; blockIndex < blockCount; blockIndex++) { @@ -562,46 +553,16 @@ FlushStripe(TableWriteState *writeState) } stripeMetadata.fileOffset = initialFileOffset; - stripeMetadata.dataLength = dataLength; + stripeMetadata.dataLength = stripeSize; stripeMetadata.id = writeState->currentStripeId; stripeMetadata.blockCount = blockCount; + stripeMetadata.blockRowCount = writeState->blockRowCount; + stripeMetadata.columnCount = columnCount; return stripeMetadata; } -/* Creates and returns the footer for given stripe. */ -static StripeFooter * -CreateStripeFooter(StripeSkipList *stripeSkipList) -{ - StripeFooter *stripeFooter = NULL; - uint32 columnIndex = 0; - uint32 columnCount = stripeSkipList->columnCount; - uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64)); - uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64)); - - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - ColumnBlockSkipNode *blockSkipNodeArray = - stripeSkipList->blockSkipNodeArray[columnIndex]; - uint32 blockIndex = 0; - - for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++) - { - existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength; - valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength; - } - } - - stripeFooter = palloc0(sizeof(StripeFooter)); - stripeFooter->columnCount = columnCount; - stripeFooter->existsSizeArray = existsSizeArray; - stripeFooter->valueSizeArray = valueSizeArray; - - return stripeFooter; -} - - /* * SerializeBoolArray serializes the given boolean array and returns the result * as a StringInfo. This function packs every 8 boolean values into one byte. @@ -679,7 +640,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { uint32 columnIndex = 0; StripeBuffers *stripeBuffers = writeState->stripeBuffers; - ColumnBlockData **blockDataArray = writeState->blockDataArray; + BlockData *blockData = writeState->blockData; CompressionType requestedCompressionType = writeState->compressionType; const uint32 columnCount = stripeBuffers->columnCount; StringInfo compressionBuffer = writeState->compressionBuffer; @@ -689,9 +650,9 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; - ColumnBlockData *blockData = blockDataArray[columnIndex]; - blockBuffers->existsBuffer = SerializeBoolArray(blockData->existsArray, rowCount); + blockBuffers->existsBuffer = + SerializeBoolArray(blockData->existsArray[columnIndex], rowCount); } /* @@ -702,12 +663,11 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; - ColumnBlockData *blockData = blockDataArray[columnIndex]; StringInfo serializedValueBuffer = NULL; CompressionType actualCompressionType = COMPRESSION_NONE; bool compressed = false; - serializedValueBuffer = blockData->valueBuffer; + serializedValueBuffer = blockData->valueBufferArray[columnIndex]; /* the only other supported compression type is pg_lz for now */ Assert(requestedCompressionType == COMPRESSION_NONE || @@ -730,7 +690,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou blockBuffers->valueBuffer = CopyStringInfo(serializedValueBuffer); /* valueBuffer needs to be reset for next block's data */ - resetStringInfo(blockData->valueBuffer); + resetStringInfo(blockData->valueBufferArray[columnIndex]); } }