diff --git a/.gitignore b/.gitignore index 0c643e590..aa7be0e36 100644 --- a/.gitignore +++ b/.gitignore @@ -42,17 +42,17 @@ /data/*.cstore /data/*.footer -/sql/block_filtering.sql -/sql/copyto.sql -/sql/create.sql -/sql/data_types.sql -/sql/load.sql +/sql/*block_filtering.sql +/sql/*copyto.sql +/sql/*create.sql +/sql/*data_types.sql +/sql/*load.sql -/expected/block_filtering.out -/expected/copyto.out -/expected/create.out -/expected/data_types.out -/expected/load.out +/expected/*block_filtering.out +/expected/*copyto.out +/expected/*create.out +/expected/*data_types.out +/expected/*load.out /results/* /.deps/* /regression.diffs diff --git a/cstore.h b/cstore.h index ad0ad20bd..e48bced61 100644 --- a/cstore.h +++ b/cstore.h @@ -78,6 +78,7 @@ typedef struct StripeMetadata uint64 fileOffset; uint64 dataLength; uint32 blockCount; + uint32 blockRowCount; uint64 rowCount; uint64 id; } StripeMetadata; @@ -128,20 +129,27 @@ typedef struct StripeSkipList /* - * ColumnBlockData represents a block of data in a column. valueArray stores + * BlockData represents a block of data for multiple columns. valueArray stores * the values of data, and existsArray stores whether a value is present. * valueBuffer is used to store (uncompressed) serialized values * referenced by Datum's in valueArray. It is only used for by-reference Datum's. * There is a one-to-one correspondence between valueArray and existsArray. */ -typedef struct ColumnBlockData +typedef struct BlockData { - bool *existsArray; - Datum *valueArray; + uint32 rowCount; + uint32 columnCount; + + /* + * Following are indexed by [column][row]. If a column is not projected, + * then existsArray[column] and valueArray[column] are NULL. + */ + bool **existsArray; + Datum **valueArray; /* valueBuffer keeps actual data for type-by-reference datums from valueArray. */ - StringInfo valueBuffer; -} ColumnBlockData; + StringInfo *valueBufferArray; +} BlockData; /* @@ -197,6 +205,7 @@ typedef struct TableReadState Oid relationId; TableMetadata *tableMetadata; + StripeMetadata *currentStripeMetadata; TupleDesc tupleDescriptor; Relation relation; @@ -212,7 +221,7 @@ typedef struct TableReadState StripeBuffers *stripeBuffers; uint32 readStripeCount; uint64 stripeReadRowCount; - ColumnBlockData **blockDataArray; + BlockData *blockData; int32 deserializedBlockIndex; } TableReadState; @@ -233,7 +242,8 @@ typedef struct TableWriteState StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; uint32 stripeMaxRowCount; - ColumnBlockData **blockDataArray; + uint32 blockRowCount; + BlockData *blockData; /* * compressionBuffer buffer is used as temporary storage during @@ -276,10 +286,9 @@ extern void CStoreEndRead(TableReadState *state); /* Function declarations for common functions */ extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId); -extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, - uint32 blockRowCount); -extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, - uint32 columnCount); +extern BlockData * CreateEmptyBlockData(uint32 columnCount, bool *columnMask, + uint32 blockRowCount); +extern void FreeBlockData(BlockData *blockData); extern uint64 CStoreTableRowCount(Relation relation); extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, CompressionType compressionType); diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index 19801f1f8..d98652b6d 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -74,6 +74,7 @@ CREATE TABLE cstore_stripes ( file_offset bigint NOT NULL, data_length bigint NOT NULL, block_count int NOT NULL, + block_row_count int NOT NULL, row_count bigint NOT NULL, PRIMARY KEY (relid, stripe), FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 8a67a3a9e..5285295b9 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -79,13 +79,14 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); #define Anum_cstore_tables_version_minor 4 /* constants for cstore_stripe */ -#define Natts_cstore_stripes 6 +#define Natts_cstore_stripes 7 #define Anum_cstore_stripes_relid 1 #define Anum_cstore_stripes_stripe 2 #define Anum_cstore_stripes_file_offset 3 #define Anum_cstore_stripes_data_length 4 #define Anum_cstore_stripes_block_count 5 -#define Anum_cstore_stripes_row_count 6 +#define Anum_cstore_stripes_block_row_count 6 +#define Anum_cstore_stripes_row_count 7 /* constants for cstore_skipnodes */ #define Natts_cstore_skipnodes 12 @@ -328,6 +329,7 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->dataLength), Int32GetDatum(stripe->blockCount), + Int32GetDatum(stripe->blockRowCount), Int64GetDatum(stripe->rowCount) }; @@ -388,6 +390,8 @@ ReadTableMetadata(Oid relid) datumArray[Anum_cstore_stripes_data_length - 1]); stripeMetadata->blockCount = DatumGetInt32( datumArray[Anum_cstore_stripes_block_count - 1]); + stripeMetadata->blockRowCount = DatumGetInt32( + datumArray[Anum_cstore_stripes_block_row_count - 1]); stripeMetadata->rowCount = DatumGetInt64( datumArray[Anum_cstore_stripes_row_count - 1]); diff --git a/cstore_reader.c b/cstore_reader.c index fecb45605..caf07473f 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -45,8 +45,8 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, List *whereClauseList); static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, uint64 blockIndex, uint64 blockRowIndex, - ColumnBlockData **blockDataArray, - Datum *columnValues, bool *columnNulls); + BlockData *blockData, Datum *columnValues, + bool *columnNulls); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCount, uint64 existsFileOffset, @@ -70,15 +70,12 @@ static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCount, bool datumTypeByValue, int datumTypeLength, char datumTypeAlign, Datum *datumArray); -static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, - uint32 rowCount, ColumnBlockData **blockDataArray, - TupleDesc tupleDescriptor); +static BlockData * DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, + uint32 rowCount, TupleDesc tupleDescriptor, + List *projectedColumnList); static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm); static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size); -static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, - uint32 columnCount); - /* * CStoreBeginRead initializes a cstore read operation. This function returns a @@ -91,9 +88,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, TableReadState *readState = NULL; TableMetadata *tableMetadata = NULL; MemoryContext stripeReadContext = NULL; - uint32 columnCount = 0; - bool *projectedColumnMask = NULL; - ColumnBlockData **blockDataArray = NULL; tableMetadata = ReadTableMetadata(relationId); @@ -106,11 +100,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, "Stripe Read Memory Context", ALLOCSET_DEFAULT_SIZES); - columnCount = tupleDescriptor->natts; - projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); - blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask, - tableMetadata->blockRowCount); - readState = palloc0(sizeof(TableReadState)); readState->relationId = relationId; readState->tableMetadata = tableMetadata; @@ -121,7 +110,7 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor, readState->stripeReadRowCount = 0; readState->tupleDescriptor = tupleDescriptor; readState->stripeReadContext = stripeReadContext; - readState->blockDataArray = blockDataArray; + readState->blockData = NULL; readState->deserializedBlockIndex = -1; return readState; @@ -138,7 +127,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { uint32 blockIndex = 0; uint32 blockRowIndex = 0; - TableMetadata *tableMetadata = readState->tableMetadata; + StripeMetadata *stripeMetadata = readState->currentStripeMetadata; MemoryContext oldContext = NULL; /* @@ -151,7 +140,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { StripeBuffers *stripeBuffers = NULL; StripeMetadata *stripeMetadata = NULL; - List *stripeMetadataList = tableMetadata->stripeMetadataList; + List *stripeMetadataList = readState->tableMetadata->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); StripeFooter *stripeFooter = NULL; @@ -163,6 +152,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu oldContext = MemoryContextSwitchTo(readState->stripeReadContext); MemoryContextReset(readState->stripeReadContext); + readState->blockData = NULL; stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); stripeFooter = ReadStripeFooter(readState->relationId, @@ -175,6 +165,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu readState->projectedColumnList, readState->whereClauseList); readState->readStripeCount++; + readState->currentStripeMetadata = stripeMetadata; MemoryContextSwitchTo(oldContext); @@ -183,37 +174,38 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu readState->stripeBuffers = stripeBuffers; readState->stripeReadRowCount = 0; readState->deserializedBlockIndex = -1; - ResetUncompressedBlockData(readState->blockDataArray, - stripeBuffers->columnCount); break; } } - blockIndex = readState->stripeReadRowCount / tableMetadata->blockRowCount; - blockRowIndex = readState->stripeReadRowCount % tableMetadata->blockRowCount; + blockIndex = readState->stripeReadRowCount / stripeMetadata->blockRowCount; + blockRowIndex = readState->stripeReadRowCount % stripeMetadata->blockRowCount; if (blockIndex != readState->deserializedBlockIndex) { uint32 lastBlockIndex = 0; uint32 blockRowCount = 0; uint32 stripeRowCount = 0; + StripeMetadata *stripeMetadata = readState->currentStripeMetadata; - stripeRowCount = readState->stripeBuffers->rowCount; - lastBlockIndex = stripeRowCount / tableMetadata->blockRowCount; + stripeRowCount = stripeMetadata->rowCount; + lastBlockIndex = stripeRowCount / stripeMetadata->blockRowCount; if (blockIndex == lastBlockIndex) { - blockRowCount = stripeRowCount % tableMetadata->blockRowCount; + blockRowCount = stripeRowCount % stripeMetadata->blockRowCount; } else { - blockRowCount = tableMetadata->blockRowCount; + blockRowCount = stripeMetadata->blockRowCount; } oldContext = MemoryContextSwitchTo(readState->stripeReadContext); - DeserializeBlockData(readState->stripeBuffers, blockIndex, - blockRowCount, readState->blockDataArray, - readState->tupleDescriptor); + FreeBlockData(readState->blockData); + readState->blockData = + DeserializeBlockData(readState->stripeBuffers, blockIndex, + blockRowCount, readState->tupleDescriptor, + readState->projectedColumnList); MemoryContextSwitchTo(oldContext); @@ -221,7 +213,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu } ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList, - blockIndex, blockRowIndex, readState->blockDataArray, + blockIndex, blockRowIndex, readState->blockData, columnValues, columnNulls); /* @@ -242,11 +234,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu void CStoreEndRead(TableReadState *readState) { - int columnCount = readState->tupleDescriptor->natts; - MemoryContextDelete(readState->stripeReadContext); list_free_deep(readState->tableMetadata->stripeMetadataList); - FreeColumnBlockDataArray(readState->blockDataArray, columnCount); pfree(readState->tableMetadata); pfree(readState); } @@ -256,52 +245,65 @@ CStoreEndRead(TableReadState *readState) * CreateEmptyBlockDataArray creates data buffers to keep deserialized exist and * value arrays for requested columns in columnMask. */ -ColumnBlockData ** -CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, uint32 blockRowCount) +BlockData * +CreateEmptyBlockData(uint32 columnCount, bool *columnMask, uint32 blockRowCount) { uint32 columnIndex = 0; - ColumnBlockData **blockDataArray = palloc0(columnCount * sizeof(ColumnBlockData *)); + + BlockData *blockData = palloc0(sizeof(BlockData)); + blockData->existsArray = palloc0(columnCount * sizeof(bool *)); + blockData->valueArray = palloc0(columnCount * sizeof(Datum *)); + blockData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo)); + blockData->columnCount = columnCount; + blockData->rowCount = blockRowCount; /* allocate block memory for deserialized data */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { if (columnMask[columnIndex]) { - ColumnBlockData *blockData = palloc0(sizeof(ColumnBlockData)); - - blockData->existsArray = palloc0(blockRowCount * sizeof(bool)); - blockData->valueArray = palloc0(blockRowCount * sizeof(Datum)); - blockData->valueBuffer = NULL; - blockDataArray[columnIndex] = blockData; + blockData->existsArray[columnIndex] = palloc0(blockRowCount * sizeof(bool)); + blockData->valueArray[columnIndex] = palloc0(blockRowCount * sizeof(Datum)); + blockData->valueBufferArray[columnIndex] = NULL; } } - return blockDataArray; + return blockData; } /* - * FreeColumnBlockDataArray deallocates data buffers to keep deserialized exist and + * FreeBlockData deallocates data buffers to keep deserialized exist and * value arrays for requested columns in columnMask. * ColumnBlockData->serializedValueBuffer lives in memory read/write context * so it is deallocated automatically when the context is deleted. */ void -FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) +FreeBlockData(BlockData *blockData) { uint32 columnIndex = 0; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + + if (blockData == NULL) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - if (blockData != NULL) + return; + } + + for (columnIndex = 0; columnIndex < blockData->columnCount; columnIndex++) + { + if (blockData->existsArray[columnIndex] != NULL) { - pfree(blockData->existsArray); - pfree(blockData->valueArray); - pfree(blockData); + pfree(blockData->existsArray[columnIndex]); + } + + if (blockData->valueArray[columnIndex] != NULL) + { + pfree(blockData->valueArray[columnIndex]); } } - pfree(blockDataArray); + pfree(blockData->existsArray); + pfree(blockData->valueArray); + pfree(blockData); } @@ -403,7 +405,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, uint64 blockIndex, uint64 blockRowIndex, - ColumnBlockData **blockDataArray, Datum *columnValues, + BlockData *blockData, Datum *columnValues, bool *columnNulls) { ListCell *projectedColumnCell = NULL; @@ -414,13 +416,12 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, foreach(projectedColumnCell, projectedColumnList) { Var *projectedColumn = lfirst(projectedColumnCell); - uint32 projectedColumnIndex = projectedColumn->varattno - 1; - ColumnBlockData *blockData = blockDataArray[projectedColumnIndex]; + uint32 columnIndex = projectedColumn->varattno - 1; - if (blockData->existsArray[blockRowIndex]) + if (blockData->existsArray[columnIndex][blockRowIndex]) { - columnValues[projectedColumnIndex] = blockData->valueArray[blockRowIndex]; - columnNulls[projectedColumnIndex] = false; + columnValues[columnIndex] = blockData->valueArray[columnIndex][blockRowIndex]; + columnNulls[columnIndex] = false; } } } @@ -919,20 +920,23 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou * data is not present serialized buffer, then default value (or null) is used * to fill value array. */ -static void +static BlockData * DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, - uint32 rowCount, - ColumnBlockData **blockDataArray, TupleDesc tupleDescriptor) + uint32 rowCount, TupleDesc tupleDescriptor, + List *projectedColumnList) { int columnIndex = 0; + bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList); + BlockData *blockData = CreateEmptyBlockData(tupleDescriptor->natts, columnMask, + rowCount); + for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex); ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; bool columnAdded = false; - if ((columnBuffers == NULL) && (blockData != NULL)) + if (columnBuffers == NULL && columnMask[columnIndex]) { columnAdded = true; } @@ -943,10 +947,6 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, columnBuffers->blockBuffersArray[blockIndex]; StringInfo valueBuffer = NULL; - /* free previous block's data buffers */ - pfree(blockData->valueBuffer->data); - pfree(blockData->valueBuffer); - /* decompress and deserialize current block's data */ valueBuffer = DecompressBuffer(blockBuffers->valueBuffer, blockBuffers->valueCompressionType); @@ -958,15 +958,16 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, pfree(blockBuffers->valueBuffer); } - DeserializeBoolArray(blockBuffers->existsBuffer, blockData->existsArray, + DeserializeBoolArray(blockBuffers->existsBuffer, + blockData->existsArray[columnIndex], rowCount); - DeserializeDatumArray(valueBuffer, blockData->existsArray, + DeserializeDatumArray(valueBuffer, blockData->existsArray[columnIndex], rowCount, attributeForm->attbyval, attributeForm->attlen, attributeForm->attalign, - blockData->valueArray); + blockData->valueArray[columnIndex]); /* store current block's data buffer to be freed at next block read */ - blockData->valueBuffer = valueBuffer; + blockData->valueBufferArray[columnIndex] = valueBuffer; } else if (columnAdded) { @@ -983,16 +984,19 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { - blockData->existsArray[rowIndex] = true; - blockData->valueArray[rowIndex] = defaultValue; + blockData->existsArray[columnIndex][rowIndex] = true; + blockData->valueArray[columnIndex][rowIndex] = defaultValue; } } else { - memset(blockData->existsArray, false, rowCount); + memset(blockData->existsArray[columnIndex], false, + rowCount * sizeof(bool)); } } } + + return blockData; } @@ -1067,23 +1071,3 @@ ReadFromSmgr(Relation rel, uint64 offset, uint32 size) return resultBuffer; } - - -/* - * ResetUncompressedBlockData iterates over deserialized column block data - * and sets valueBuffer field to empty buffer. This field is allocated in stripe - * memory context and becomes invalid once memory context is reset. - */ -static void -ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount) -{ - uint32 columnIndex = 0; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - if (blockData != NULL) - { - blockData->valueBuffer = makeStringInfo(); - } - } -} diff --git a/cstore_writer.c b/cstore_writer.c index 55a314ec4..cf0fa58fe 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -72,7 +72,7 @@ CStoreBeginWrite(Oid relationId, uint32 columnCount = 0; uint32 columnIndex = 0; bool *columnMaskArray = NULL; - ColumnBlockData **blockData = NULL; + BlockData *blockData = NULL; uint64 currentStripeId = 0; tableMetadata = ReadTableMetadata(relationId); @@ -125,20 +125,21 @@ CStoreBeginWrite(Oid relationId, columnMaskArray = palloc(columnCount * sizeof(bool)); memset(columnMaskArray, true, columnCount); - blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount); + blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount); writeState = palloc0(sizeof(TableWriteState)); writeState->relationId = relationId; writeState->tableMetadata = tableMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; + writeState->blockRowCount = blockRowCount; writeState->tupleDescriptor = tupleDescriptor; writeState->currentFileOffset = currentFileOffset; writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; writeState->stripeWriteContext = stripeWriteContext; - writeState->blockDataArray = blockData; + writeState->blockData = blockData; writeState->compressionBuffer = NULL; writeState->currentStripeId = currentStripeId; @@ -164,8 +165,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; TableMetadata *tableMetadata = writeState->tableMetadata; - const uint32 blockRowCount = tableMetadata->blockRowCount; - ColumnBlockData **blockDataArray = writeState->blockDataArray; + const uint32 blockRowCount = writeState->blockRowCount; + BlockData *blockData = writeState->blockData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); if (stripeBuffers == NULL) @@ -184,8 +185,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; - blockData->valueBuffer = makeStringInfo(); + blockData->valueBufferArray[columnIndex] = makeStringInfo(); } } @@ -194,14 +194,13 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnBlockData *blockData = blockDataArray[columnIndex]; ColumnBlockSkipNode **blockSkipNodeArray = stripeSkipList->blockSkipNodeArray; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[columnIndex][blockIndex]; if (columnNulls[columnIndex]) { - blockData->existsArray[blockRowIndex] = false; + blockData->existsArray[columnIndex][blockRowIndex] = false; } else { @@ -214,10 +213,11 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul Oid columnCollation = attributeForm->attcollation; char columnTypeAlign = attributeForm->attalign; - blockData->existsArray[blockRowIndex] = true; + blockData->existsArray[columnIndex][blockRowIndex] = true; - SerializeSingleDatum(blockData->valueBuffer, columnValues[columnIndex], - columnTypeByValue, columnTypeLength, columnTypeAlign); + SerializeSingleDatum(blockData->valueBufferArray[columnIndex], + columnValues[columnIndex], columnTypeByValue, + columnTypeLength, columnTypeAlign); UpdateBlockSkipNodeMinMax(blockSkipNode, columnValues[columnIndex], columnTypeByValue, columnTypeLength, @@ -271,7 +271,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul void CStoreEndWrite(TableWriteState *writeState) { - int columnCount = writeState->tupleDescriptor->natts; StripeBuffers *stripeBuffers = writeState->stripeBuffers; if (stripeBuffers != NULL) @@ -289,7 +288,7 @@ CStoreEndWrite(TableWriteState *writeState) MemoryContextDelete(writeState->stripeWriteContext); list_free_deep(writeState->tableMetadata->stripeMetadataList); pfree(writeState->comparisonFunctionArray); - FreeColumnBlockDataArray(writeState->blockDataArray, columnCount); + FreeBlockData(writeState->blockData); pfree(writeState); } @@ -415,6 +414,8 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) if (RelationNeedsWAL(rel)) { + XLogRecPtr recptr = 0; + XLogBeginInsert(); /* @@ -423,7 +424,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) */ XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE); - XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0); + recptr = XLogInsert(RM_GENERIC_ID, 0); PageSetLSN(page, recptr); } @@ -452,14 +453,13 @@ FlushStripe(TableWriteState *writeState) StripeFooter *stripeFooter = NULL; uint32 columnIndex = 0; uint32 blockIndex = 0; - TableMetadata *tableMetadata = writeState->tableMetadata; StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray; TupleDesc tupleDescriptor = writeState->tupleDescriptor; uint32 columnCount = tupleDescriptor->natts; uint32 blockCount = stripeSkipList->blockCount; - uint32 blockRowCount = tableMetadata->blockRowCount; + uint32 blockRowCount = writeState->blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint64 initialFileOffset = writeState->currentFileOffset; @@ -565,6 +565,7 @@ FlushStripe(TableWriteState *writeState) stripeMetadata.dataLength = dataLength; stripeMetadata.id = writeState->currentStripeId; stripeMetadata.blockCount = blockCount; + stripeMetadata.blockRowCount = writeState->blockRowCount; return stripeMetadata; } @@ -679,7 +680,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { uint32 columnIndex = 0; StripeBuffers *stripeBuffers = writeState->stripeBuffers; - ColumnBlockData **blockDataArray = writeState->blockDataArray; + BlockData *blockData = writeState->blockData; CompressionType requestedCompressionType = writeState->compressionType; const uint32 columnCount = stripeBuffers->columnCount; StringInfo compressionBuffer = writeState->compressionBuffer; @@ -689,9 +690,9 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; - ColumnBlockData *blockData = blockDataArray[columnIndex]; - blockBuffers->existsBuffer = SerializeBoolArray(blockData->existsArray, rowCount); + blockBuffers->existsBuffer = + SerializeBoolArray(blockData->existsArray[columnIndex], rowCount); } /* @@ -702,12 +703,11 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou { ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; - ColumnBlockData *blockData = blockDataArray[columnIndex]; StringInfo serializedValueBuffer = NULL; CompressionType actualCompressionType = COMPRESSION_NONE; bool compressed = false; - serializedValueBuffer = blockData->valueBuffer; + serializedValueBuffer = blockData->valueBufferArray[columnIndex]; /* the only other supported compression type is pg_lz for now */ Assert(requestedCompressionType == COMPRESSION_NONE || @@ -730,7 +730,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou blockBuffers->valueBuffer = CopyStringInfo(serializedValueBuffer); /* valueBuffer needs to be reset for next block's data */ - resetStringInfo(blockData->valueBuffer); + resetStringInfo(blockData->valueBufferArray[columnIndex]); } }