From db5287069ff8b765746470e199e25fce3b979a2f Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 21 Sep 2020 22:10:25 -0700 Subject: [PATCH] Make block offsets relative to stripe start --- cstore.h | 1 + cstore_reader.c | 14 +++++--------- cstore_writer.c | 30 ++++++++++++++++++++++-------- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/cstore.h b/cstore.h index e48bced61..f88ebbdab 100644 --- a/cstore.h +++ b/cstore.h @@ -235,6 +235,7 @@ typedef struct TableWriteState TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; uint64 currentFileOffset; + uint64 currentStripeOffset; Relation relation; MemoryContext stripeWriteContext; diff --git a/cstore_reader.c b/cstore_reader.c index caf07473f..6b5d7ed00 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -49,8 +49,7 @@ static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColum bool *columnNulls); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, - uint32 blockCount, uint64 existsFileOffset, - uint64 valueFileOffset, + uint32 blockCount, uint64 stripeOffset, Form_pg_attribute attributeForm); static bool * SelectedBlockMask(StripeSkipList *stripeSkipList, List *projectedColumnList, List *whereClauseList); @@ -365,8 +364,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, { uint64 existsSize = stripeFooter->existsSizeArray[columnIndex]; uint64 valueSize = stripeFooter->valueSizeArray[columnIndex]; - uint64 existsFileOffset = currentColumnFileOffset; - uint64 valueFileOffset = currentColumnFileOffset + existsSize; if (projectedColumnMask[columnIndex]) { @@ -377,8 +374,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode, blockCount, - existsFileOffset, - valueFileOffset, + stripeMetadata->fileOffset, attributeForm); columnBuffersArray[columnIndex] = columnBuffers; @@ -434,7 +430,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, */ static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, - uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset, + uint32 blockCount, uint64 stripeOffset, Form_pg_attribute attributeForm) { ColumnBuffers *columnBuffers = NULL; @@ -455,7 +451,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, for (blockIndex = 0; blockIndex < blockCount; blockIndex++) { ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset; + uint64 existsOffset = stripeOffset + blockSkipNode->existsBlockOffset; StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset, blockSkipNode->existsLength); @@ -467,7 +463,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, { ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; CompressionType compressionType = blockSkipNode->valueCompressionType; - uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset; + uint64 valueOffset = stripeOffset + blockSkipNode->valueBlockOffset; StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset, blockSkipNode->valueLength); diff --git a/cstore_writer.c b/cstore_writer.c index cf0fa58fe..65871b511 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -242,6 +242,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul MemoryContextReset(writeState->stripeWriteContext); writeState->currentStripeId++; + writeState->currentStripeOffset = 0; /* set stripe data and skip list to NULL so they are recreated next time */ writeState->stripeBuffers = NULL; @@ -473,12 +474,10 @@ FlushStripe(TableWriteState *writeState) SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount); } - /* update buffer sizes and positions in stripe skip list */ + /* update buffer sizes in stripe skip list */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex]; - uint64 currentExistsBlockOffset = 0; - uint64 currentValueBlockOffset = 0; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; for (blockIndex = 0; blockIndex < blockCount; blockIndex++) @@ -486,21 +485,36 @@ FlushStripe(TableWriteState *writeState) ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; uint64 existsBufferSize = blockBuffers->existsBuffer->len; + ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; + + blockSkipNode->existsBlockOffset = writeState->currentStripeOffset; + blockSkipNode->existsLength = existsBufferSize; + writeState->currentStripeOffset += existsBufferSize; + } + } + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex]; + ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; + + for (blockIndex = 0; blockIndex < blockCount; blockIndex++) + { + ColumnBlockBuffers *blockBuffers = + columnBuffers->blockBuffersArray[blockIndex]; uint64 valueBufferSize = blockBuffers->valueBuffer->len; CompressionType valueCompressionType = blockBuffers->valueCompressionType; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; - blockSkipNode->existsBlockOffset = currentExistsBlockOffset; - blockSkipNode->existsLength = existsBufferSize; - blockSkipNode->valueBlockOffset = currentValueBlockOffset; + blockSkipNode->valueBlockOffset = writeState->currentStripeOffset; blockSkipNode->valueLength = valueBufferSize; blockSkipNode->valueCompressionType = valueCompressionType; - currentExistsBlockOffset += existsBufferSize; - currentValueBlockOffset += valueBufferSize; + writeState->currentStripeOffset += valueBufferSize; } } + /* create skip list and footer buffers */ SaveStripeSkipList(writeState->relationId, writeState->currentStripeId, stripeSkipList, tupleDescriptor);