Merge pull request #10 from citusdata/cleanup_metadata

Metadata simplification and some refactoring
merge-cstore-pykello
Hadi Moshayedi 2020-09-23 10:42:48 -07:00 committed by GitHub
commit 398394056c
6 changed files with 176 additions and 387 deletions

20
.gitignore vendored
View File

@ -42,17 +42,17 @@
/data/*.cstore /data/*.cstore
/data/*.footer /data/*.footer
/sql/block_filtering.sql /sql/*block_filtering.sql
/sql/copyto.sql /sql/*copyto.sql
/sql/create.sql /sql/*create.sql
/sql/data_types.sql /sql/*data_types.sql
/sql/load.sql /sql/*load.sql
/expected/block_filtering.out /expected/*block_filtering.out
/expected/copyto.out /expected/*copyto.out
/expected/create.out /expected/*create.out
/expected/data_types.out /expected/*data_types.out
/expected/load.out /expected/*load.out
/results/* /results/*
/.deps/* /.deps/*
/regression.diffs /regression.diffs

View File

@ -77,7 +77,9 @@ typedef struct StripeMetadata
{ {
uint64 fileOffset; uint64 fileOffset;
uint64 dataLength; uint64 dataLength;
uint32 columnCount;
uint32 blockCount; uint32 blockCount;
uint32 blockRowCount;
uint64 rowCount; uint64 rowCount;
uint64 id; uint64 id;
} StripeMetadata; } StripeMetadata;
@ -128,20 +130,27 @@ typedef struct StripeSkipList
/* /*
* ColumnBlockData represents a block of data in a column. valueArray stores * BlockData represents a block of data for multiple columns. valueArray stores
* the values of data, and existsArray stores whether a value is present. * the values of data, and existsArray stores whether a value is present.
* valueBuffer is used to store (uncompressed) serialized values * valueBuffer is used to store (uncompressed) serialized values
* referenced by Datum's in valueArray. It is only used for by-reference Datum's. * referenced by Datum's in valueArray. It is only used for by-reference Datum's.
* There is a one-to-one correspondence between valueArray and existsArray. * There is a one-to-one correspondence between valueArray and existsArray.
*/ */
typedef struct ColumnBlockData typedef struct BlockData
{ {
bool *existsArray; uint32 rowCount;
Datum *valueArray; uint32 columnCount;
/*
* Following are indexed by [column][row]. If a column is not projected,
* then existsArray[column] and valueArray[column] are NULL.
*/
bool **existsArray;
Datum **valueArray;
/* valueBuffer keeps actual data for type-by-reference datums from valueArray. */ /* valueBuffer keeps actual data for type-by-reference datums from valueArray. */
StringInfo valueBuffer; StringInfo *valueBufferArray;
} ColumnBlockData; } BlockData;
/* /*
@ -178,25 +187,13 @@ typedef struct StripeBuffers
} StripeBuffers; } StripeBuffers;
/*
* StripeFooter represents a stripe's footer. In this footer, we keep three
* arrays of sizes. The number of elements in each of the arrays is equal
* to the number of columns.
*/
typedef struct StripeFooter
{
uint32 columnCount;
uint64 *existsSizeArray;
uint64 *valueSizeArray;
} StripeFooter;
/* TableReadState represents state of a cstore file read operation. */ /* TableReadState represents state of a cstore file read operation. */
typedef struct TableReadState typedef struct TableReadState
{ {
Oid relationId; Oid relationId;
TableMetadata *tableMetadata; TableMetadata *tableMetadata;
StripeMetadata *currentStripeMetadata;
TupleDesc tupleDescriptor; TupleDesc tupleDescriptor;
Relation relation; Relation relation;
@ -212,7 +209,7 @@ typedef struct TableReadState
StripeBuffers *stripeBuffers; StripeBuffers *stripeBuffers;
uint32 readStripeCount; uint32 readStripeCount;
uint64 stripeReadRowCount; uint64 stripeReadRowCount;
ColumnBlockData **blockDataArray; BlockData *blockData;
int32 deserializedBlockIndex; int32 deserializedBlockIndex;
} TableReadState; } TableReadState;
@ -233,7 +230,8 @@ typedef struct TableWriteState
StripeBuffers *stripeBuffers; StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList; StripeSkipList *stripeSkipList;
uint32 stripeMaxRowCount; uint32 stripeMaxRowCount;
ColumnBlockData **blockDataArray; uint32 blockRowCount;
BlockData *blockData;
/* /*
* compressionBuffer buffer is used as temporary storage during * compressionBuffer buffer is used as temporary storage during
@ -276,19 +274,15 @@ extern void CStoreEndRead(TableReadState *state);
/* Function declarations for common functions */ /* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
int16 procedureId); int16 procedureId);
extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, extern BlockData * CreateEmptyBlockData(uint32 columnCount, bool *columnMask,
uint32 blockRowCount); uint32 blockRowCount);
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, extern void FreeBlockData(BlockData *blockData);
uint32 columnCount);
extern uint64 CStoreTableRowCount(Relation relation); extern uint64 CStoreTableRowCount(Relation relation);
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType); CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
/* cstore_metadata_tables.c */ /* cstore_metadata_tables.c */
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
extern TableMetadata * ReadTableMetadata(Oid relid); extern TableMetadata * ReadTableMetadata(Oid relid);

View File

@ -73,7 +73,9 @@ CREATE TABLE cstore_stripes (
stripe bigint NOT NULL, stripe bigint NOT NULL,
file_offset bigint NOT NULL, file_offset bigint NOT NULL,
data_length bigint NOT NULL, data_length bigint NOT NULL,
column_count int NOT NULL,
block_count int NOT NULL, block_count int NOT NULL,
block_row_count int NOT NULL,
row_count bigint NOT NULL, row_count bigint NOT NULL,
PRIMARY KEY (relid, stripe), PRIMARY KEY (relid, stripe),
FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
@ -81,18 +83,6 @@ CREATE TABLE cstore_stripes (
COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata'; COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata';
CREATE TABLE cstore_stripe_attr (
relid oid NOT NULL,
stripe bigint NOT NULL,
attr int NOT NULL,
exists_size bigint NOT NULL,
value_size bigint NOT NULL,
PRIMARY KEY (relid, stripe, attr),
FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore_tables IS 'CStore per stripe/column combination metadata';
CREATE TABLE cstore_skipnodes ( CREATE TABLE cstore_skipnodes (
relid oid NOT NULL, relid oid NOT NULL,
stripe bigint NOT NULL, stripe bigint NOT NULL,
@ -107,7 +97,7 @@ CREATE TABLE cstore_skipnodes (
exists_stream_length bigint NOT NULL, exists_stream_length bigint NOT NULL,
value_compression_type int NOT NULL, value_compression_type int NOT NULL,
PRIMARY KEY (relid, stripe, attr, block), PRIMARY KEY (relid, stripe, attr, block),
FOREIGN KEY (relid, stripe, attr) REFERENCES cstore_stripe_attr(relid, stripe, attr) ON DELETE CASCADE INITIALLY DEFERRED FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
) WITH (user_catalog_table = true); ) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore_tables IS 'CStore per block metadata'; COMMENT ON TABLE cstore_tables IS 'CStore per block metadata';

View File

@ -43,8 +43,6 @@ typedef struct
EState *estate; EState *estate;
} ModifyState; } ModifyState;
static Oid CStoreStripeAttrRelationId(void);
static Oid CStoreStripeAttrIndexRelationId(void);
static Oid CStoreStripesRelationId(void); static Oid CStoreStripesRelationId(void);
static Oid CStoreStripesIndexRelationId(void); static Oid CStoreStripesIndexRelationId(void);
static Oid CStoreTablesRelationId(void); static Oid CStoreTablesRelationId(void);
@ -63,14 +61,6 @@ static EState * create_estate_for_relation(Relation rel);
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm); static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
/* constants for cstore_stripe_attr */
#define Natts_cstore_stripe_attr 5
#define Anum_cstore_stripe_attr_relid 1
#define Anum_cstore_stripe_attr_stripe 2
#define Anum_cstore_stripe_attr_attr 3
#define Anum_cstore_stripe_attr_exists_size 4
#define Anum_cstore_stripe_attr_value_size 5
/* constants for cstore_table */ /* constants for cstore_table */
#define Natts_cstore_tables 4 #define Natts_cstore_tables 4
#define Anum_cstore_tables_relid 1 #define Anum_cstore_tables_relid 1
@ -79,13 +69,15 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
#define Anum_cstore_tables_version_minor 4 #define Anum_cstore_tables_version_minor 4
/* constants for cstore_stripe */ /* constants for cstore_stripe */
#define Natts_cstore_stripes 6 #define Natts_cstore_stripes 8
#define Anum_cstore_stripes_relid 1 #define Anum_cstore_stripes_relid 1
#define Anum_cstore_stripes_stripe 2 #define Anum_cstore_stripes_stripe 2
#define Anum_cstore_stripes_file_offset 3 #define Anum_cstore_stripes_file_offset 3
#define Anum_cstore_stripes_data_length 4 #define Anum_cstore_stripes_data_length 4
#define Anum_cstore_stripes_block_count 5 #define Anum_cstore_stripes_column_count 5
#define Anum_cstore_stripes_row_count 6 #define Anum_cstore_stripes_block_count 6
#define Anum_cstore_stripes_block_row_count 7
#define Anum_cstore_stripes_row_count 8
/* constants for cstore_skipnodes */ /* constants for cstore_skipnodes */
#define Natts_cstore_skipnodes 12 #define Natts_cstore_skipnodes 12
@ -327,7 +319,9 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
Int64GetDatum(stripe->id), Int64GetDatum(stripe->id),
Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->fileOffset),
Int64GetDatum(stripe->dataLength), Int64GetDatum(stripe->dataLength),
Int32GetDatum(stripe->columnCount),
Int32GetDatum(stripe->blockCount), Int32GetDatum(stripe->blockCount),
Int32GetDatum(stripe->blockRowCount),
Int64GetDatum(stripe->rowCount) Int64GetDatum(stripe->rowCount)
}; };
@ -386,8 +380,12 @@ ReadTableMetadata(Oid relid)
datumArray[Anum_cstore_stripes_file_offset - 1]); datumArray[Anum_cstore_stripes_file_offset - 1]);
stripeMetadata->dataLength = DatumGetInt64( stripeMetadata->dataLength = DatumGetInt64(
datumArray[Anum_cstore_stripes_data_length - 1]); datumArray[Anum_cstore_stripes_data_length - 1]);
stripeMetadata->columnCount = DatumGetInt32(
datumArray[Anum_cstore_stripes_column_count - 1]);
stripeMetadata->blockCount = DatumGetInt32( stripeMetadata->blockCount = DatumGetInt32(
datumArray[Anum_cstore_stripes_block_count - 1]); datumArray[Anum_cstore_stripes_block_count - 1]);
stripeMetadata->blockRowCount = DatumGetInt32(
datumArray[Anum_cstore_stripes_block_row_count - 1]);
stripeMetadata->rowCount = DatumGetInt64( stripeMetadata->rowCount = DatumGetInt64(
datumArray[Anum_cstore_stripes_row_count - 1]); datumArray[Anum_cstore_stripes_row_count - 1]);
@ -481,103 +479,6 @@ DeleteTableMetadataRowIfExists(Oid relid)
} }
/*
* SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records.
*/
void
SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
{
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs);
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
{
bool nulls[Natts_cstore_stripe_attr] = { 0 };
Datum values[Natts_cstore_stripe_attr] = {
ObjectIdGetDatum(relid),
Int64GetDatum(stripe),
Int16GetDatum(attr),
Int64GetDatum(footer->existsSizeArray[attr - 1]),
Int64GetDatum(footer->valueSizeArray[attr - 1])
};
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
}
FinishModifyRelation(modifyState);
heap_close(cstoreStripeAttrs, NoLock);
}
/*
* ReadStripeFooter returns a StripeFooter by reading relevant records from
* cstore_stripe_attr.
*/
StripeFooter *
ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
{
StripeFooter *footer = NULL;
HeapTuple heapTuple;
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, AccessShareLock);
Relation index = index_open(CStoreStripeAttrIndexRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs);
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_cstore_stripe_attr_relid,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
ScanKeyInit(&scanKey[1], Anum_cstore_stripe_attr_stripe,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(stripe));
scanDescriptor = systable_beginscan_ordered(cstoreStripeAttrs, index, NULL, 2,
scanKey);
footer = palloc0(sizeof(StripeFooter));
footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64));
footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64));
/*
* Stripe can have less columns than the relation if ALTER TABLE happens
* after stripe is formed. So we calculate column count of a stripe as
* maximum attribute number for that stripe.
*/
footer->columnCount = 0;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Datum datumArray[Natts_cstore_stripe_attr];
bool isNullArray[Natts_cstore_stripe_attr];
AttrNumber attr = 0;
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
attr = DatumGetInt16(datumArray[2]);
footer->columnCount = Max(footer->columnCount, attr);
while (attr > relationColumnCount)
{
ereport(ERROR, (errmsg("unexpected attribute %d for a relation with %d attrs",
attr, relationColumnCount)));
}
footer->existsSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]);
footer->valueSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]);
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
heap_close(cstoreStripeAttrs, NoLock);
return footer;
}
/* /*
* StartModifyRelation allocates resources for modifications. * StartModifyRelation allocates resources for modifications.
*/ */
@ -756,28 +657,6 @@ ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm)
} }
/*
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreStripeAttrRelationId(void)
{
return get_relname_relid("cstore_stripe_attr", CStoreNamespaceId());
}
/*
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_pkey.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreStripeAttrIndexRelationId(void)
{
return get_relname_relid("cstore_stripe_attr_pkey", CStoreNamespaceId());
}
/* /*
* CStoreStripesRelationId returns relation id of cstore_stripes. * CStoreStripesRelationId returns relation id of cstore_stripes.
* TODO: should we cache this similar to citus? * TODO: should we cache this similar to citus?

View File

@ -39,18 +39,16 @@
/* static function declarations */ /* static function declarations */
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
StripeMetadata *stripeMetadata, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
List *projectedColumnList, List *projectedColumnList,
List *whereClauseList); List *whereClauseList);
static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
uint64 blockIndex, uint64 blockRowIndex, uint64 blockIndex, uint64 blockRowIndex,
ColumnBlockData **blockDataArray, BlockData *blockData, Datum *columnValues,
Datum *columnValues, bool *columnNulls); bool *columnNulls);
static ColumnBuffers * LoadColumnBuffers(Relation relation, static ColumnBuffers * LoadColumnBuffers(Relation relation,
ColumnBlockSkipNode *blockSkipNodeArray, ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset, uint32 blockCount, uint64 stripeOffset,
uint64 valueFileOffset,
Form_pg_attribute attributeForm); Form_pg_attribute attributeForm);
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList, static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
List *projectedColumnList, List *whereClauseList); List *projectedColumnList, List *whereClauseList);
@ -70,15 +68,12 @@ static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
uint32 datumCount, bool datumTypeByValue, uint32 datumCount, bool datumTypeByValue,
int datumTypeLength, char datumTypeAlign, int datumTypeLength, char datumTypeAlign,
Datum *datumArray); Datum *datumArray);
static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, static BlockData * DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
uint32 rowCount, ColumnBlockData **blockDataArray, uint32 rowCount, TupleDesc tupleDescriptor,
TupleDesc tupleDescriptor); List *projectedColumnList);
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
Form_pg_attribute attributeForm); Form_pg_attribute attributeForm);
static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size); static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
uint32 columnCount);
/* /*
* CStoreBeginRead initializes a cstore read operation. This function returns a * CStoreBeginRead initializes a cstore read operation. This function returns a
@ -91,9 +86,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
TableReadState *readState = NULL; TableReadState *readState = NULL;
TableMetadata *tableMetadata = NULL; TableMetadata *tableMetadata = NULL;
MemoryContext stripeReadContext = NULL; MemoryContext stripeReadContext = NULL;
uint32 columnCount = 0;
bool *projectedColumnMask = NULL;
ColumnBlockData **blockDataArray = NULL;
tableMetadata = ReadTableMetadata(relationId); tableMetadata = ReadTableMetadata(relationId);
@ -106,11 +98,6 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
"Stripe Read Memory Context", "Stripe Read Memory Context",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
columnCount = tupleDescriptor->natts;
projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask,
tableMetadata->blockRowCount);
readState = palloc0(sizeof(TableReadState)); readState = palloc0(sizeof(TableReadState));
readState->relationId = relationId; readState->relationId = relationId;
readState->tableMetadata = tableMetadata; readState->tableMetadata = tableMetadata;
@ -121,7 +108,7 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
readState->stripeReadRowCount = 0; readState->stripeReadRowCount = 0;
readState->tupleDescriptor = tupleDescriptor; readState->tupleDescriptor = tupleDescriptor;
readState->stripeReadContext = stripeReadContext; readState->stripeReadContext = stripeReadContext;
readState->blockDataArray = blockDataArray; readState->blockData = NULL;
readState->deserializedBlockIndex = -1; readState->deserializedBlockIndex = -1;
return readState; return readState;
@ -138,7 +125,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
{ {
uint32 blockIndex = 0; uint32 blockIndex = 0;
uint32 blockRowIndex = 0; uint32 blockRowIndex = 0;
TableMetadata *tableMetadata = readState->tableMetadata; StripeMetadata *stripeMetadata = readState->currentStripeMetadata;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
/* /*
@ -151,9 +138,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
{ {
StripeBuffers *stripeBuffers = NULL; StripeBuffers *stripeBuffers = NULL;
StripeMetadata *stripeMetadata = NULL; StripeMetadata *stripeMetadata = NULL;
List *stripeMetadataList = tableMetadata->stripeMetadataList; List *stripeMetadataList = readState->tableMetadata->stripeMetadataList;
uint32 stripeCount = list_length(stripeMetadataList); uint32 stripeCount = list_length(stripeMetadataList);
StripeFooter *stripeFooter = NULL;
/* if we have read all stripes, return false */ /* if we have read all stripes, return false */
if (readState->readStripeCount == stripeCount) if (readState->readStripeCount == stripeCount)
@ -163,18 +149,16 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
oldContext = MemoryContextSwitchTo(readState->stripeReadContext); oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
MemoryContextReset(readState->stripeReadContext); MemoryContextReset(readState->stripeReadContext);
readState->blockData = NULL;
stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount);
stripeFooter = ReadStripeFooter(readState->relationId,
stripeMetadata->id,
readState->tupleDescriptor->natts);
stripeBuffers = LoadFilteredStripeBuffers(readState->relation, stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
stripeMetadata, stripeMetadata,
stripeFooter,
readState->tupleDescriptor, readState->tupleDescriptor,
readState->projectedColumnList, readState->projectedColumnList,
readState->whereClauseList); readState->whereClauseList);
readState->readStripeCount++; readState->readStripeCount++;
readState->currentStripeMetadata = stripeMetadata;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -183,37 +167,38 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
readState->stripeBuffers = stripeBuffers; readState->stripeBuffers = stripeBuffers;
readState->stripeReadRowCount = 0; readState->stripeReadRowCount = 0;
readState->deserializedBlockIndex = -1; readState->deserializedBlockIndex = -1;
ResetUncompressedBlockData(readState->blockDataArray,
stripeBuffers->columnCount);
break; break;
} }
} }
blockIndex = readState->stripeReadRowCount / tableMetadata->blockRowCount; blockIndex = readState->stripeReadRowCount / stripeMetadata->blockRowCount;
blockRowIndex = readState->stripeReadRowCount % tableMetadata->blockRowCount; blockRowIndex = readState->stripeReadRowCount % stripeMetadata->blockRowCount;
if (blockIndex != readState->deserializedBlockIndex) if (blockIndex != readState->deserializedBlockIndex)
{ {
uint32 lastBlockIndex = 0; uint32 lastBlockIndex = 0;
uint32 blockRowCount = 0; uint32 blockRowCount = 0;
uint32 stripeRowCount = 0; uint32 stripeRowCount = 0;
StripeMetadata *stripeMetadata = readState->currentStripeMetadata;
stripeRowCount = readState->stripeBuffers->rowCount; stripeRowCount = stripeMetadata->rowCount;
lastBlockIndex = stripeRowCount / tableMetadata->blockRowCount; lastBlockIndex = stripeRowCount / stripeMetadata->blockRowCount;
if (blockIndex == lastBlockIndex) if (blockIndex == lastBlockIndex)
{ {
blockRowCount = stripeRowCount % tableMetadata->blockRowCount; blockRowCount = stripeRowCount % stripeMetadata->blockRowCount;
} }
else else
{ {
blockRowCount = tableMetadata->blockRowCount; blockRowCount = stripeMetadata->blockRowCount;
} }
oldContext = MemoryContextSwitchTo(readState->stripeReadContext); oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
FreeBlockData(readState->blockData);
readState->blockData =
DeserializeBlockData(readState->stripeBuffers, blockIndex, DeserializeBlockData(readState->stripeBuffers, blockIndex,
blockRowCount, readState->blockDataArray, blockRowCount, readState->tupleDescriptor,
readState->tupleDescriptor); readState->projectedColumnList);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -221,7 +206,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
} }
ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList, ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList,
blockIndex, blockRowIndex, readState->blockDataArray, blockIndex, blockRowIndex, readState->blockData,
columnValues, columnNulls); columnValues, columnNulls);
/* /*
@ -242,11 +227,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
void void
CStoreEndRead(TableReadState *readState) CStoreEndRead(TableReadState *readState)
{ {
int columnCount = readState->tupleDescriptor->natts;
MemoryContextDelete(readState->stripeReadContext); MemoryContextDelete(readState->stripeReadContext);
list_free_deep(readState->tableMetadata->stripeMetadataList); list_free_deep(readState->tableMetadata->stripeMetadataList);
FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
pfree(readState->tableMetadata); pfree(readState->tableMetadata);
pfree(readState); pfree(readState);
} }
@ -256,52 +238,65 @@ CStoreEndRead(TableReadState *readState)
* CreateEmptyBlockDataArray creates data buffers to keep deserialized exist and * CreateEmptyBlockDataArray creates data buffers to keep deserialized exist and
* value arrays for requested columns in columnMask. * value arrays for requested columns in columnMask.
*/ */
ColumnBlockData ** BlockData *
CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, uint32 blockRowCount) CreateEmptyBlockData(uint32 columnCount, bool *columnMask, uint32 blockRowCount)
{ {
uint32 columnIndex = 0; uint32 columnIndex = 0;
ColumnBlockData **blockDataArray = palloc0(columnCount * sizeof(ColumnBlockData *));
BlockData *blockData = palloc0(sizeof(BlockData));
blockData->existsArray = palloc0(columnCount * sizeof(bool *));
blockData->valueArray = palloc0(columnCount * sizeof(Datum *));
blockData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo));
blockData->columnCount = columnCount;
blockData->rowCount = blockRowCount;
/* allocate block memory for deserialized data */ /* allocate block memory for deserialized data */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
if (columnMask[columnIndex]) if (columnMask[columnIndex])
{ {
ColumnBlockData *blockData = palloc0(sizeof(ColumnBlockData)); blockData->existsArray[columnIndex] = palloc0(blockRowCount * sizeof(bool));
blockData->valueArray[columnIndex] = palloc0(blockRowCount * sizeof(Datum));
blockData->existsArray = palloc0(blockRowCount * sizeof(bool)); blockData->valueBufferArray[columnIndex] = NULL;
blockData->valueArray = palloc0(blockRowCount * sizeof(Datum));
blockData->valueBuffer = NULL;
blockDataArray[columnIndex] = blockData;
} }
} }
return blockDataArray; return blockData;
} }
/* /*
* FreeColumnBlockDataArray deallocates data buffers to keep deserialized exist and * FreeBlockData deallocates data buffers to keep deserialized exist and
* value arrays for requested columns in columnMask. * value arrays for requested columns in columnMask.
* ColumnBlockData->serializedValueBuffer lives in memory read/write context * ColumnBlockData->serializedValueBuffer lives in memory read/write context
* so it is deallocated automatically when the context is deleted. * so it is deallocated automatically when the context is deleted.
*/ */
void void
FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) FreeBlockData(BlockData *blockData)
{ {
uint32 columnIndex = 0; uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
if (blockData == NULL)
{ {
ColumnBlockData *blockData = blockDataArray[columnIndex]; return;
if (blockData != NULL) }
for (columnIndex = 0; columnIndex < blockData->columnCount; columnIndex++)
{ {
pfree(blockData->existsArray); if (blockData->existsArray[columnIndex] != NULL)
pfree(blockData->valueArray); {
pfree(blockData); pfree(blockData->existsArray[columnIndex]);
}
if (blockData->valueArray[columnIndex] != NULL)
{
pfree(blockData->valueArray[columnIndex]);
} }
} }
pfree(blockDataArray); pfree(blockData->existsArray);
pfree(blockData->valueArray);
pfree(blockData);
} }
@ -332,12 +327,11 @@ CStoreTableRowCount(Relation relation)
*/ */
static StripeBuffers * static StripeBuffers *
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, TupleDesc tupleDescriptor, TupleDesc tupleDescriptor, List *projectedColumnList,
List *projectedColumnList, List *whereClauseList) List *whereClauseList)
{ {
StripeBuffers *stripeBuffers = NULL; StripeBuffers *stripeBuffers = NULL;
ColumnBuffers **columnBuffersArray = NULL; ColumnBuffers **columnBuffersArray = NULL;
uint64 currentColumnFileOffset = 0;
uint32 columnIndex = 0; uint32 columnIndex = 0;
uint32 columnCount = tupleDescriptor->natts; uint32 columnCount = tupleDescriptor->natts;
@ -357,15 +351,9 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
/* load column data for projected columns */ /* load column data for projected columns */
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *)); columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
currentColumnFileOffset = stripeMetadata->fileOffset;
for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++) for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++)
{ {
uint64 existsSize = stripeFooter->existsSizeArray[columnIndex];
uint64 valueSize = stripeFooter->valueSizeArray[columnIndex];
uint64 existsFileOffset = currentColumnFileOffset;
uint64 valueFileOffset = currentColumnFileOffset + existsSize;
if (projectedColumnMask[columnIndex]) if (projectedColumnMask[columnIndex])
{ {
ColumnBlockSkipNode *blockSkipNode = ColumnBlockSkipNode *blockSkipNode =
@ -375,15 +363,11 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode, ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode,
blockCount, blockCount,
existsFileOffset, stripeMetadata->fileOffset,
valueFileOffset,
attributeForm); attributeForm);
columnBuffersArray[columnIndex] = columnBuffers; columnBuffersArray[columnIndex] = columnBuffers;
} }
currentColumnFileOffset += existsSize;
currentColumnFileOffset += valueSize;
} }
stripeBuffers = palloc0(sizeof(StripeBuffers)); stripeBuffers = palloc0(sizeof(StripeBuffers));
@ -403,7 +387,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
static void static void
ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
uint64 blockIndex, uint64 blockRowIndex, uint64 blockIndex, uint64 blockRowIndex,
ColumnBlockData **blockDataArray, Datum *columnValues, BlockData *blockData, Datum *columnValues,
bool *columnNulls) bool *columnNulls)
{ {
ListCell *projectedColumnCell = NULL; ListCell *projectedColumnCell = NULL;
@ -414,13 +398,12 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
foreach(projectedColumnCell, projectedColumnList) foreach(projectedColumnCell, projectedColumnList)
{ {
Var *projectedColumn = lfirst(projectedColumnCell); Var *projectedColumn = lfirst(projectedColumnCell);
uint32 projectedColumnIndex = projectedColumn->varattno - 1; uint32 columnIndex = projectedColumn->varattno - 1;
ColumnBlockData *blockData = blockDataArray[projectedColumnIndex];
if (blockData->existsArray[blockRowIndex]) if (blockData->existsArray[columnIndex][blockRowIndex])
{ {
columnValues[projectedColumnIndex] = blockData->valueArray[blockRowIndex]; columnValues[columnIndex] = blockData->valueArray[columnIndex][blockRowIndex];
columnNulls[projectedColumnIndex] = false; columnNulls[columnIndex] = false;
} }
} }
} }
@ -433,7 +416,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
*/ */
static ColumnBuffers * static ColumnBuffers *
LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray, LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset, uint32 blockCount, uint64 stripeOffset,
Form_pg_attribute attributeForm) Form_pg_attribute attributeForm)
{ {
ColumnBuffers *columnBuffers = NULL; ColumnBuffers *columnBuffers = NULL;
@ -454,7 +437,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
for (blockIndex = 0; blockIndex < blockCount; blockIndex++) for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{ {
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset; uint64 existsOffset = stripeOffset + blockSkipNode->existsBlockOffset;
StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset, StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
blockSkipNode->existsLength); blockSkipNode->existsLength);
@ -466,7 +449,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
{ {
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
CompressionType compressionType = blockSkipNode->valueCompressionType; CompressionType compressionType = blockSkipNode->valueCompressionType;
uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset; uint64 valueOffset = stripeOffset + blockSkipNode->valueBlockOffset;
StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset, StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
blockSkipNode->valueLength); blockSkipNode->valueLength);
@ -919,20 +902,23 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou
* data is not present serialized buffer, then default value (or null) is used * data is not present serialized buffer, then default value (or null) is used
* to fill value array. * to fill value array.
*/ */
static void static BlockData *
DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
uint32 rowCount, uint32 rowCount, TupleDesc tupleDescriptor,
ColumnBlockData **blockDataArray, TupleDesc tupleDescriptor) List *projectedColumnList)
{ {
int columnIndex = 0; int columnIndex = 0;
bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList);
BlockData *blockData = CreateEmptyBlockData(tupleDescriptor->natts, columnMask,
rowCount);
for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++) for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++)
{ {
ColumnBlockData *blockData = blockDataArray[columnIndex];
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex); Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
bool columnAdded = false; bool columnAdded = false;
if ((columnBuffers == NULL) && (blockData != NULL)) if (columnBuffers == NULL && columnMask[columnIndex])
{ {
columnAdded = true; columnAdded = true;
} }
@ -943,10 +929,6 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
StringInfo valueBuffer = NULL; StringInfo valueBuffer = NULL;
/* free previous block's data buffers */
pfree(blockData->valueBuffer->data);
pfree(blockData->valueBuffer);
/* decompress and deserialize current block's data */ /* decompress and deserialize current block's data */
valueBuffer = DecompressBuffer(blockBuffers->valueBuffer, valueBuffer = DecompressBuffer(blockBuffers->valueBuffer,
blockBuffers->valueCompressionType); blockBuffers->valueCompressionType);
@ -958,15 +940,16 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
pfree(blockBuffers->valueBuffer); pfree(blockBuffers->valueBuffer);
} }
DeserializeBoolArray(blockBuffers->existsBuffer, blockData->existsArray, DeserializeBoolArray(blockBuffers->existsBuffer,
blockData->existsArray[columnIndex],
rowCount); rowCount);
DeserializeDatumArray(valueBuffer, blockData->existsArray, DeserializeDatumArray(valueBuffer, blockData->existsArray[columnIndex],
rowCount, attributeForm->attbyval, rowCount, attributeForm->attbyval,
attributeForm->attlen, attributeForm->attalign, attributeForm->attlen, attributeForm->attalign,
blockData->valueArray); blockData->valueArray[columnIndex]);
/* store current block's data buffer to be freed at next block read */ /* store current block's data buffer to be freed at next block read */
blockData->valueBuffer = valueBuffer; blockData->valueBufferArray[columnIndex] = valueBuffer;
} }
else if (columnAdded) else if (columnAdded)
{ {
@ -983,16 +966,19 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
for (rowIndex = 0; rowIndex < rowCount; rowIndex++) for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
blockData->existsArray[rowIndex] = true; blockData->existsArray[columnIndex][rowIndex] = true;
blockData->valueArray[rowIndex] = defaultValue; blockData->valueArray[columnIndex][rowIndex] = defaultValue;
} }
} }
else else
{ {
memset(blockData->existsArray, false, rowCount); memset(blockData->existsArray[columnIndex], false,
rowCount * sizeof(bool));
} }
} }
} }
return blockData;
} }
@ -1067,23 +1053,3 @@ ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
return resultBuffer; return resultBuffer;
} }
/*
* ResetUncompressedBlockData iterates over deserialized column block data
* and sets valueBuffer field to empty buffer. This field is allocated in stripe
* memory context and becomes invalid once memory context is reset.
*/
static void
ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount)
{
uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
ColumnBlockData *blockData = blockDataArray[columnIndex];
if (blockData != NULL)
{
blockData->valueBuffer = makeStringInfo();
}
}
}

View File

@ -34,7 +34,6 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
uint32 blockRowCount, uint32 blockRowCount,
uint32 columnCount); uint32 columnCount);
static StripeMetadata FlushStripe(TableWriteState *writeState); static StripeMetadata FlushStripe(TableWriteState *writeState);
static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList);
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
bool datumTypeByValue, int datumTypeLength, bool datumTypeByValue, int datumTypeLength,
@ -72,7 +71,7 @@ CStoreBeginWrite(Oid relationId,
uint32 columnCount = 0; uint32 columnCount = 0;
uint32 columnIndex = 0; uint32 columnIndex = 0;
bool *columnMaskArray = NULL; bool *columnMaskArray = NULL;
ColumnBlockData **blockData = NULL; BlockData *blockData = NULL;
uint64 currentStripeId = 0; uint64 currentStripeId = 0;
tableMetadata = ReadTableMetadata(relationId); tableMetadata = ReadTableMetadata(relationId);
@ -125,20 +124,21 @@ CStoreBeginWrite(Oid relationId,
columnMaskArray = palloc(columnCount * sizeof(bool)); columnMaskArray = palloc(columnCount * sizeof(bool));
memset(columnMaskArray, true, columnCount); memset(columnMaskArray, true, columnCount);
blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount); blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount);
writeState = palloc0(sizeof(TableWriteState)); writeState = palloc0(sizeof(TableWriteState));
writeState->relationId = relationId; writeState->relationId = relationId;
writeState->tableMetadata = tableMetadata; writeState->tableMetadata = tableMetadata;
writeState->compressionType = compressionType; writeState->compressionType = compressionType;
writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->stripeMaxRowCount = stripeMaxRowCount;
writeState->blockRowCount = blockRowCount;
writeState->tupleDescriptor = tupleDescriptor; writeState->tupleDescriptor = tupleDescriptor;
writeState->currentFileOffset = currentFileOffset; writeState->currentFileOffset = currentFileOffset;
writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->comparisonFunctionArray = comparisonFunctionArray;
writeState->stripeBuffers = NULL; writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL; writeState->stripeSkipList = NULL;
writeState->stripeWriteContext = stripeWriteContext; writeState->stripeWriteContext = stripeWriteContext;
writeState->blockDataArray = blockData; writeState->blockData = blockData;
writeState->compressionBuffer = NULL; writeState->compressionBuffer = NULL;
writeState->currentStripeId = currentStripeId; writeState->currentStripeId = currentStripeId;
@ -164,8 +164,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
StripeSkipList *stripeSkipList = writeState->stripeSkipList; StripeSkipList *stripeSkipList = writeState->stripeSkipList;
uint32 columnCount = writeState->tupleDescriptor->natts; uint32 columnCount = writeState->tupleDescriptor->natts;
TableMetadata *tableMetadata = writeState->tableMetadata; TableMetadata *tableMetadata = writeState->tableMetadata;
const uint32 blockRowCount = tableMetadata->blockRowCount; const uint32 blockRowCount = writeState->blockRowCount;
ColumnBlockData **blockDataArray = writeState->blockDataArray; BlockData *blockData = writeState->blockData;
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
if (stripeBuffers == NULL) if (stripeBuffers == NULL)
@ -184,8 +184,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
*/ */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
ColumnBlockData *blockData = blockDataArray[columnIndex]; blockData->valueBufferArray[columnIndex] = makeStringInfo();
blockData->valueBuffer = makeStringInfo();
} }
} }
@ -194,14 +193,13 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
ColumnBlockData *blockData = blockDataArray[columnIndex];
ColumnBlockSkipNode **blockSkipNodeArray = stripeSkipList->blockSkipNodeArray; ColumnBlockSkipNode **blockSkipNodeArray = stripeSkipList->blockSkipNodeArray;
ColumnBlockSkipNode *blockSkipNode = ColumnBlockSkipNode *blockSkipNode =
&blockSkipNodeArray[columnIndex][blockIndex]; &blockSkipNodeArray[columnIndex][blockIndex];
if (columnNulls[columnIndex]) if (columnNulls[columnIndex])
{ {
blockData->existsArray[blockRowIndex] = false; blockData->existsArray[columnIndex][blockRowIndex] = false;
} }
else else
{ {
@ -214,10 +212,11 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
Oid columnCollation = attributeForm->attcollation; Oid columnCollation = attributeForm->attcollation;
char columnTypeAlign = attributeForm->attalign; char columnTypeAlign = attributeForm->attalign;
blockData->existsArray[blockRowIndex] = true; blockData->existsArray[columnIndex][blockRowIndex] = true;
SerializeSingleDatum(blockData->valueBuffer, columnValues[columnIndex], SerializeSingleDatum(blockData->valueBufferArray[columnIndex],
columnTypeByValue, columnTypeLength, columnTypeAlign); columnValues[columnIndex], columnTypeByValue,
columnTypeLength, columnTypeAlign);
UpdateBlockSkipNodeMinMax(blockSkipNode, columnValues[columnIndex], UpdateBlockSkipNodeMinMax(blockSkipNode, columnValues[columnIndex],
columnTypeByValue, columnTypeLength, columnTypeByValue, columnTypeLength,
@ -271,7 +270,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
void void
CStoreEndWrite(TableWriteState *writeState) CStoreEndWrite(TableWriteState *writeState)
{ {
int columnCount = writeState->tupleDescriptor->natts;
StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeBuffers *stripeBuffers = writeState->stripeBuffers;
if (stripeBuffers != NULL) if (stripeBuffers != NULL)
@ -289,7 +287,7 @@ CStoreEndWrite(TableWriteState *writeState)
MemoryContextDelete(writeState->stripeWriteContext); MemoryContextDelete(writeState->stripeWriteContext);
list_free_deep(writeState->tableMetadata->stripeMetadataList); list_free_deep(writeState->tableMetadata->stripeMetadataList);
pfree(writeState->comparisonFunctionArray); pfree(writeState->comparisonFunctionArray);
FreeColumnBlockDataArray(writeState->blockDataArray, columnCount); FreeBlockData(writeState->blockData);
pfree(writeState); pfree(writeState);
} }
@ -415,6 +413,8 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
if (RelationNeedsWAL(rel)) if (RelationNeedsWAL(rel))
{ {
XLogRecPtr recptr = 0;
XLogBeginInsert(); XLogBeginInsert();
/* /*
@ -423,7 +423,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
*/ */
XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE); XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE);
XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0); recptr = XLogInsert(RM_GENERIC_ID, 0);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
} }
@ -448,21 +448,19 @@ static StripeMetadata
FlushStripe(TableWriteState *writeState) FlushStripe(TableWriteState *writeState)
{ {
StripeMetadata stripeMetadata = { 0 }; StripeMetadata stripeMetadata = { 0 };
uint64 dataLength = 0;
StripeFooter *stripeFooter = NULL;
uint32 columnIndex = 0; uint32 columnIndex = 0;
uint32 blockIndex = 0; uint32 blockIndex = 0;
TableMetadata *tableMetadata = writeState->tableMetadata;
StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeBuffers *stripeBuffers = writeState->stripeBuffers;
StripeSkipList *stripeSkipList = writeState->stripeSkipList; StripeSkipList *stripeSkipList = writeState->stripeSkipList;
ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray; ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray;
TupleDesc tupleDescriptor = writeState->tupleDescriptor; TupleDesc tupleDescriptor = writeState->tupleDescriptor;
uint32 columnCount = tupleDescriptor->natts; uint32 columnCount = tupleDescriptor->natts;
uint32 blockCount = stripeSkipList->blockCount; uint32 blockCount = stripeSkipList->blockCount;
uint32 blockRowCount = tableMetadata->blockRowCount; uint32 blockRowCount = writeState->blockRowCount;
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
uint64 initialFileOffset = writeState->currentFileOffset; uint64 initialFileOffset = writeState->currentFileOffset;
uint64 stripeSize = 0;
/* /*
* check if the last block needs serialization , the last block was not serialized * check if the last block needs serialization , the last block was not serialized
@ -473,12 +471,10 @@ FlushStripe(TableWriteState *writeState)
SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount); SerializeBlockData(writeState, lastBlockIndex, lastBlockRowCount);
} }
/* update buffer sizes and positions in stripe skip list */ /* update buffer sizes in stripe skip list */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex]; ColumnBlockSkipNode *blockSkipNodeArray = columnSkipNodeArray[columnIndex];
uint64 currentExistsBlockOffset = 0;
uint64 currentValueBlockOffset = 0;
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
for (blockIndex = 0; blockIndex < blockCount; blockIndex++) for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
@ -486,26 +482,29 @@ FlushStripe(TableWriteState *writeState)
ColumnBlockBuffers *blockBuffers = ColumnBlockBuffers *blockBuffers =
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
uint64 existsBufferSize = blockBuffers->existsBuffer->len; uint64 existsBufferSize = blockBuffers->existsBuffer->len;
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
blockSkipNode->existsBlockOffset = stripeSize;
blockSkipNode->existsLength = existsBufferSize;
stripeSize += existsBufferSize;
}
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
ColumnBlockBuffers *blockBuffers =
columnBuffers->blockBuffersArray[blockIndex];
uint64 valueBufferSize = blockBuffers->valueBuffer->len; uint64 valueBufferSize = blockBuffers->valueBuffer->len;
CompressionType valueCompressionType = blockBuffers->valueCompressionType; CompressionType valueCompressionType = blockBuffers->valueCompressionType;
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
blockSkipNode->existsBlockOffset = currentExistsBlockOffset; blockSkipNode->valueBlockOffset = stripeSize;
blockSkipNode->existsLength = existsBufferSize;
blockSkipNode->valueBlockOffset = currentValueBlockOffset;
blockSkipNode->valueLength = valueBufferSize; blockSkipNode->valueLength = valueBufferSize;
blockSkipNode->valueCompressionType = valueCompressionType; blockSkipNode->valueCompressionType = valueCompressionType;
currentExistsBlockOffset += existsBufferSize; stripeSize += valueBufferSize;
currentValueBlockOffset += valueBufferSize;
} }
} }
/* create skip list and footer buffers */
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
stripeSkipList, tupleDescriptor);
stripeFooter = CreateStripeFooter(stripeSkipList);
/* /*
* Each stripe has only one section: * Each stripe has only one section:
* Data section, in which we store data for each column continuously. * Data section, in which we store data for each column continuously.
@ -543,17 +542,9 @@ FlushStripe(TableWriteState *writeState)
} }
} }
/* finally, we flush the footer buffer */ /* create skip list and footer buffers */
SaveStripeFooter(writeState->relationId, SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
writeState->currentStripeId, stripeSkipList, tupleDescriptor);
stripeFooter);
/* set stripe metadata */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
dataLength += stripeFooter->existsSizeArray[columnIndex];
dataLength += stripeFooter->valueSizeArray[columnIndex];
}
for (blockIndex = 0; blockIndex < blockCount; blockIndex++) for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{ {
@ -562,46 +553,16 @@ FlushStripe(TableWriteState *writeState)
} }
stripeMetadata.fileOffset = initialFileOffset; stripeMetadata.fileOffset = initialFileOffset;
stripeMetadata.dataLength = dataLength; stripeMetadata.dataLength = stripeSize;
stripeMetadata.id = writeState->currentStripeId; stripeMetadata.id = writeState->currentStripeId;
stripeMetadata.blockCount = blockCount; stripeMetadata.blockCount = blockCount;
stripeMetadata.blockRowCount = writeState->blockRowCount;
stripeMetadata.columnCount = columnCount;
return stripeMetadata; return stripeMetadata;
} }
/* Creates and returns the footer for given stripe. */
static StripeFooter *
CreateStripeFooter(StripeSkipList *stripeSkipList)
{
StripeFooter *stripeFooter = NULL;
uint32 columnIndex = 0;
uint32 columnCount = stripeSkipList->columnCount;
uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64));
uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
ColumnBlockSkipNode *blockSkipNodeArray =
stripeSkipList->blockSkipNodeArray[columnIndex];
uint32 blockIndex = 0;
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
{
existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength;
valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength;
}
}
stripeFooter = palloc0(sizeof(StripeFooter));
stripeFooter->columnCount = columnCount;
stripeFooter->existsSizeArray = existsSizeArray;
stripeFooter->valueSizeArray = valueSizeArray;
return stripeFooter;
}
/* /*
* SerializeBoolArray serializes the given boolean array and returns the result * SerializeBoolArray serializes the given boolean array and returns the result
* as a StringInfo. This function packs every 8 boolean values into one byte. * as a StringInfo. This function packs every 8 boolean values into one byte.
@ -679,7 +640,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
{ {
uint32 columnIndex = 0; uint32 columnIndex = 0;
StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeBuffers *stripeBuffers = writeState->stripeBuffers;
ColumnBlockData **blockDataArray = writeState->blockDataArray; BlockData *blockData = writeState->blockData;
CompressionType requestedCompressionType = writeState->compressionType; CompressionType requestedCompressionType = writeState->compressionType;
const uint32 columnCount = stripeBuffers->columnCount; const uint32 columnCount = stripeBuffers->columnCount;
StringInfo compressionBuffer = writeState->compressionBuffer; StringInfo compressionBuffer = writeState->compressionBuffer;
@ -689,9 +650,9 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
{ {
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex];
ColumnBlockData *blockData = blockDataArray[columnIndex];
blockBuffers->existsBuffer = SerializeBoolArray(blockData->existsArray, rowCount); blockBuffers->existsBuffer =
SerializeBoolArray(blockData->existsArray[columnIndex], rowCount);
} }
/* /*
@ -702,12 +663,11 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
{ {
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex];
ColumnBlockData *blockData = blockDataArray[columnIndex];
StringInfo serializedValueBuffer = NULL; StringInfo serializedValueBuffer = NULL;
CompressionType actualCompressionType = COMPRESSION_NONE; CompressionType actualCompressionType = COMPRESSION_NONE;
bool compressed = false; bool compressed = false;
serializedValueBuffer = blockData->valueBuffer; serializedValueBuffer = blockData->valueBufferArray[columnIndex];
/* the only other supported compression type is pg_lz for now */ /* the only other supported compression type is pg_lz for now */
Assert(requestedCompressionType == COMPRESSION_NONE || Assert(requestedCompressionType == COMPRESSION_NONE ||
@ -730,7 +690,7 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
blockBuffers->valueBuffer = CopyStringInfo(serializedValueBuffer); blockBuffers->valueBuffer = CopyStringInfo(serializedValueBuffer);
/* valueBuffer needs to be reset for next block's data */ /* valueBuffer needs to be reset for next block's data */
resetStringInfo(blockData->valueBuffer); resetStringInfo(blockData->valueBufferArray[columnIndex]);
} }
} }