mirror of https://github.com/citusdata/citus.git
Make block offsets relative to stripe start
parent
bc585be3ed
commit
db5287069f
1
cstore.h
1
cstore.h
|
@ -235,6 +235,7 @@ typedef struct TableWriteState
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
FmgrInfo **comparisonFunctionArray;
|
FmgrInfo **comparisonFunctionArray;
|
||||||
uint64 currentFileOffset;
|
uint64 currentFileOffset;
|
||||||
|
uint64 currentStripeOffset;
|
||||||
Relation relation;
|
Relation relation;
|
||||||
|
|
||||||
MemoryContext stripeWriteContext;
|
MemoryContext stripeWriteContext;
|
||||||
|
|
|
@ -49,8 +49,7 @@ static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColum
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||||
ColumnBlockSkipNode *blockSkipNodeArray,
|
ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
uint32 blockCount, uint64 existsFileOffset,
|
uint32 blockCount, uint64 stripeOffset,
|
||||||
uint64 valueFileOffset,
|
|
||||||
Form_pg_attribute attributeForm);
|
Form_pg_attribute attributeForm);
|
||||||
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
|
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
|
||||||
List *projectedColumnList, List *whereClauseList);
|
List *projectedColumnList, List *whereClauseList);
|
||||||
|
@ -365,8 +364,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
{
|
{
|
||||||
uint64 existsSize = stripeFooter->existsSizeArray[columnIndex];
|
uint64 existsSize = stripeFooter->existsSizeArray[columnIndex];
|
||||||
uint64 valueSize = stripeFooter->valueSizeArray[columnIndex];
|
uint64 valueSize = stripeFooter->valueSizeArray[columnIndex];
|
||||||
uint64 existsFileOffset = currentColumnFileOffset;
|
|
||||||
uint64 valueFileOffset = currentColumnFileOffset + existsSize;
|
|
||||||
|
|
||||||
if (projectedColumnMask[columnIndex])
|
if (projectedColumnMask[columnIndex])
|
||||||
{
|
{
|
||||||
|
@ -377,8 +374,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
|
|
||||||
ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode,
|
ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode,
|
||||||
blockCount,
|
blockCount,
|
||||||
existsFileOffset,
|
stripeMetadata->fileOffset,
|
||||||
valueFileOffset,
|
|
||||||
attributeForm);
|
attributeForm);
|
||||||
|
|
||||||
columnBuffersArray[columnIndex] = columnBuffers;
|
columnBuffersArray[columnIndex] = columnBuffers;
|
||||||
|
@ -434,7 +430,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
|
||||||
*/
|
*/
|
||||||
static ColumnBuffers *
|
static ColumnBuffers *
|
||||||
LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset,
|
uint32 blockCount, uint64 stripeOffset,
|
||||||
Form_pg_attribute attributeForm)
|
Form_pg_attribute attributeForm)
|
||||||
{
|
{
|
||||||
ColumnBuffers *columnBuffers = NULL;
|
ColumnBuffers *columnBuffers = NULL;
|
||||||
|
@ -455,7 +451,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||||
{
|
{
|
||||||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||||
uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset;
|
uint64 existsOffset = stripeOffset + blockSkipNode->existsBlockOffset;
|
||||||
StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
|
StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
|
||||||
blockSkipNode->existsLength);
|
blockSkipNode->existsLength);
|
||||||
|
|
||||||
|
@ -467,7 +463,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
{
|
{
|
||||||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||||
CompressionType compressionType = blockSkipNode->valueCompressionType;
|
CompressionType compressionType = blockSkipNode->valueCompressionType;
|
||||||
uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset;
|
uint64 valueOffset = stripeOffset + blockSkipNode->valueBlockOffset;
|
||||||
StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
|
StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
|
||||||
blockSkipNode->valueLength);
|
blockSkipNode->valueLength);
|
||||||
|
|
||||||
|
|
|
@ -242,6 +242,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
MemoryContextReset(writeState->stripeWriteContext);
|
MemoryContextReset(writeState->stripeWriteContext);
|
||||||
|
|
||||||
writeState->currentStripeId++;
|
writeState->currentStripeId++;
|
||||||
|
writeState->currentStripeOffset = 0;
|
||||||
|
|
||||||
/* set stripe data and skip list to NULL so they are recreated next time */
|
/* set stripe data and skip list to NULL so they are recreated next time */
|
||||||
writeState->stripeBuffers = NULL;
|
writeState->stripeBuffers = NULL;
|
||||||
|
@ -473,12 +474,10 @@ FlushStripe(TableWriteState *writeState)
|
||||||
SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount);
|
SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* update buffer sizes and positions in stripe skip list */
|
/* update buffer sizes in stripe skip list */
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex];
|
ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex];
|
||||||
uint64 currentExistsBlockOffset = 0;
|
|
||||||
uint64 currentValueBlockOffset = 0;
|
|
||||||
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
||||||
|
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||||
|
@ -486,21 +485,36 @@ FlushStripe(TableWriteState *writeState)
|
||||||
ColumnBlockBuffers *blockBuffers =
|
ColumnBlockBuffers *blockBuffers =
|
||||||
columnBuffers->blockBuffersArray[blockIndex];
|
columnBuffers->blockBuffersArray[blockIndex];
|
||||||
uint64 existsBufferSize = blockBuffers->existsBuffer->len;
|
uint64 existsBufferSize = blockBuffers->existsBuffer->len;
|
||||||
|
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||||
|
|
||||||
|
blockSkipNode->existsBlockOffset = writeState->currentStripeOffset;
|
||||||
|
blockSkipNode->existsLength = existsBufferSize;
|
||||||
|
writeState->currentStripeOffset += existsBufferSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex];
|
||||||
|
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
||||||
|
|
||||||
|
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||||
|
{
|
||||||
|
ColumnBlockBuffers *blockBuffers =
|
||||||
|
columnBuffers->blockBuffersArray[blockIndex];
|
||||||
uint64 valueBufferSize = blockBuffers->valueBuffer->len;
|
uint64 valueBufferSize = blockBuffers->valueBuffer->len;
|
||||||
CompressionType valueCompressionType = blockBuffers->valueCompressionType;
|
CompressionType valueCompressionType = blockBuffers->valueCompressionType;
|
||||||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||||
|
|
||||||
blockSkipNode->existsBlockOffset = currentExistsBlockOffset;
|
blockSkipNode->valueBlockOffset = writeState->currentStripeOffset;
|
||||||
blockSkipNode->existsLength = existsBufferSize;
|
|
||||||
blockSkipNode->valueBlockOffset = currentValueBlockOffset;
|
|
||||||
blockSkipNode->valueLength = valueBufferSize;
|
blockSkipNode->valueLength = valueBufferSize;
|
||||||
blockSkipNode->valueCompressionType = valueCompressionType;
|
blockSkipNode->valueCompressionType = valueCompressionType;
|
||||||
|
|
||||||
currentExistsBlockOffset += existsBufferSize;
|
writeState->currentStripeOffset += valueBufferSize;
|
||||||
currentValueBlockOffset += valueBufferSize;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* create skip list and footer buffers */
|
/* create skip list and footer buffers */
|
||||||
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
|
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
|
||||||
stripeSkipList, tupleDescriptor);
|
stripeSkipList, tupleDescriptor);
|
||||||
|
|
Loading…
Reference in New Issue