Move skipnodes to metadata tables

merge-cstore-pykello
Hadi Moshayedi 2020-09-14 14:54:22 -07:00
parent c570932712
commit 2737686fd0
9 changed files with 342 additions and 590 deletions

View File

@ -6,10 +6,8 @@
MODULE_big = cstore_fdw
PG_CPPFLAGS = -std=c11
SHLIB_LINK = -lprotobuf-c
OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
cstore_metadata_serialization.o cstore_compression.o mod.o \
cstore_metadata_tables.o
OBJS = cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
cstore_compression.o mod.o cstore_metadata_tables.o
EXTENSION = cstore_fdw
DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \
@ -51,9 +49,6 @@ ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5 9.6 10 11 12))
$(error PostgreSQL 9.3 to 12 is required to compile this extension)
endif
cstore.pb-c.c: cstore.proto
protoc-c --c_out=. cstore.proto
installcheck: remove_cstore_files
remove_cstore_files:

View File

@ -81,9 +81,9 @@ typedef struct CStoreOptions
typedef struct StripeMetadata
{
uint64 fileOffset;
uint64 skipListLength;
uint64 dataLength;
uint64 footerLength;
uint32 blockCount;
uint64 rowCount;
uint64 id;
} StripeMetadata;
@ -191,7 +191,6 @@ typedef struct StripeBuffers
typedef struct StripeFooter
{
uint32 columnCount;
uint64 *skipListSizeArray;
uint64 *existsSizeArray;
uint64 *valueSizeArray;
} StripeFooter;
@ -293,6 +292,11 @@ extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationCol
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
extern TableMetadata * ReadTableMetadata(Oid relid);
extern void SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor);
extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe,
TupleDesc tupleDescriptor,
uint32 blockCount);
typedef struct SmgrAddr
{

View File

@ -1,24 +0,0 @@
syntax = "proto2";
package protobuf;
enum CompressionType {
// Values should match with the corresponding struct in cstore_fdw.h
NONE = 0;
PG_LZ = 1;
};
message ColumnBlockSkipNode {
optional uint64 rowCount = 1;
optional bytes minimumValue = 2;
optional bytes maximumValue = 3;
optional uint64 valueBlockOffset = 4;
optional uint64 valueLength = 5;
optional CompressionType valueCompressionType = 6;
optional uint64 existsBlockOffset = 7;
optional uint64 existsLength = 8;
}
message ColumnBlockSkipList {
repeated ColumnBlockSkipNode blockSkipNodeArray = 1;
}

View File

@ -68,23 +68,48 @@ CREATE TABLE cstore.cstore_tables (
PRIMARY KEY (relid)
) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore.cstore_tables IS 'CStore table wide metadata';
CREATE TABLE cstore.cstore_stripes (
relid oid NOT NULL,
stripe bigint NOT NULL,
file_offset bigint NOT NULL,
skiplist_length bigint NOT NULL,
data_length bigint NOT NULL,
block_count int NOT NULL,
row_count bigint NOT NULL,
PRIMARY KEY (relid, stripe),
FOREIGN KEY (relid) REFERENCES cstore.cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per stripe metadata';
CREATE TABLE cstore.cstore_stripe_attr (
relid oid NOT NULL,
stripe bigint NOT NULL,
attr int NOT NULL,
exists_size bigint NOT NULL,
value_size bigint NOT NULL,
skiplist_size bigint NOT NULL,
PRIMARY KEY (relid, stripe, attr),
FOREIGN KEY (relid, stripe) REFERENCES cstore.cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per stripe/column combination metadata';
CREATE TABLE cstore.cstore_skipnodes (
relid oid NOT NULL,
stripe bigint NOT NULL,
attr int NOT NULL,
block int NOT NULL,
row_count bigint NOT NULL,
minimum_value bytea,
maximum_value bytea,
value_stream_offset bigint NOT NULL,
value_stream_length bigint NOT NULL,
exists_stream_offset bigint NOT NULL,
exists_stream_length bigint NOT NULL,
value_compression_type int NOT NULL,
PRIMARY KEY (relid, stripe, attr, block),
FOREIGN KEY (relid, stripe, attr) REFERENCES cstore.cstore_stripe_attr(relid, stripe, attr) ON DELETE CASCADE INITIALLY DEFERRED
) WITH (user_catalog_table = true);
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per block metadata';

View File

@ -1,302 +0,0 @@
/*-------------------------------------------------------------------------
*
* cstore_metadata_serialization.c
*
* This file contains function definitions for serializing/deserializing cstore
* metadata.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/tupmacs.h"
#include "cstore.h"
#include "cstore_metadata_serialization.h"
#include "cstore.pb-c.h"
/* local functions forward declarations */
static ProtobufCBinaryData DatumToProtobufBinary(Datum datum, bool typeByValue,
int typeLength);
static Datum ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary,
bool typeByValue, int typeLength);
/*
* SerializeColumnSkipList serializes a column skip list, where the colum skip
* list includes all block skip nodes for that column. The function then returns
* the result as a string info.
*/
StringInfo
SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCount,
bool typeByValue, int typeLength)
{
StringInfo blockSkipListBuffer = NULL;
Protobuf__ColumnBlockSkipList protobufBlockSkipList =
PROTOBUF__COLUMN_BLOCK_SKIP_LIST__INIT;
Protobuf__ColumnBlockSkipNode **protobufBlockSkipNodeArray = NULL;
uint32 blockIndex = 0;
uint8 *blockSkipListData = NULL;
uint32 blockSkipListSize = 0;
protobufBlockSkipNodeArray = palloc0(blockCount *
sizeof(Protobuf__ColumnBlockSkipNode *));
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
ColumnBlockSkipNode blockSkipNode = blockSkipNodeArray[blockIndex];
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode = NULL;
ProtobufCBinaryData binaryMinimumValue = { 0, 0 };
ProtobufCBinaryData binaryMaximumValue = { 0, 0 };
if (blockSkipNode.hasMinMax)
{
binaryMinimumValue = DatumToProtobufBinary(blockSkipNode.minimumValue,
typeByValue, typeLength);
binaryMaximumValue = DatumToProtobufBinary(blockSkipNode.maximumValue,
typeByValue, typeLength);
}
protobufBlockSkipNode = palloc0(sizeof(Protobuf__ColumnBlockSkipNode));
protobuf__column_block_skip_node__init(protobufBlockSkipNode);
protobufBlockSkipNode->has_rowcount = true;
protobufBlockSkipNode->rowcount = blockSkipNode.rowCount;
protobufBlockSkipNode->has_minimumvalue = blockSkipNode.hasMinMax;
protobufBlockSkipNode->minimumvalue = binaryMinimumValue;
protobufBlockSkipNode->has_maximumvalue = blockSkipNode.hasMinMax;
protobufBlockSkipNode->maximumvalue = binaryMaximumValue;
protobufBlockSkipNode->has_valueblockoffset = true;
protobufBlockSkipNode->valueblockoffset = blockSkipNode.valueBlockOffset;
protobufBlockSkipNode->has_valuelength = true;
protobufBlockSkipNode->valuelength = blockSkipNode.valueLength;
protobufBlockSkipNode->has_existsblockoffset = true;
protobufBlockSkipNode->existsblockoffset = blockSkipNode.existsBlockOffset;
protobufBlockSkipNode->has_existslength = true;
protobufBlockSkipNode->existslength = blockSkipNode.existsLength;
protobufBlockSkipNode->has_valuecompressiontype = true;
protobufBlockSkipNode->valuecompressiontype =
(Protobuf__CompressionType) blockSkipNode.valueCompressionType;
protobufBlockSkipNodeArray[blockIndex] = protobufBlockSkipNode;
}
protobufBlockSkipList.n_blockskipnodearray = blockCount;
protobufBlockSkipList.blockskipnodearray = protobufBlockSkipNodeArray;
blockSkipListSize =
protobuf__column_block_skip_list__get_packed_size(&protobufBlockSkipList);
blockSkipListData = palloc0(blockSkipListSize);
protobuf__column_block_skip_list__pack(&protobufBlockSkipList, blockSkipListData);
blockSkipListBuffer = palloc0(sizeof(StringInfoData));
blockSkipListBuffer->len = blockSkipListSize;
blockSkipListBuffer->maxlen = blockSkipListSize;
blockSkipListBuffer->data = (char *) blockSkipListData;
return blockSkipListBuffer;
}
/*
* DeserializeBlockCount deserializes the given column skip list buffer and
* returns the number of blocks in column skip list.
*/
uint32
DeserializeBlockCount(StringInfo buffer)
{
uint32 blockCount = 0;
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
protobufBlockSkipList =
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
(uint8 *) buffer->data);
if (protobufBlockSkipList == NULL)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("invalid skip list buffer")));
}
blockCount = protobufBlockSkipList->n_blockskipnodearray;
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
return blockCount;
}
/*
* DeserializeRowCount deserializes the given column skip list buffer and
* returns the total number of rows in block skip list.
*/
uint32
DeserializeRowCount(StringInfo buffer)
{
uint32 rowCount = 0;
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
uint32 blockIndex = 0;
uint32 blockCount = 0;
protobufBlockSkipList =
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
(uint8 *) buffer->data);
if (protobufBlockSkipList == NULL)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("invalid skip list buffer")));
}
blockCount = (uint32) protobufBlockSkipList->n_blockskipnodearray;
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode =
protobufBlockSkipList->blockskipnodearray[blockIndex];
rowCount += protobufBlockSkipNode->rowcount;
}
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
return rowCount;
}
/*
* DeserializeColumnSkipList deserializes the given buffer and returns the result as
* a ColumnBlockSkipNode array. If the number of unpacked block skip nodes are not
* equal to the given block count function errors out.
*/
ColumnBlockSkipNode *
DeserializeColumnSkipList(StringInfo buffer, bool typeByValue, int typeLength,
uint32 blockCount)
{
ColumnBlockSkipNode *blockSkipNodeArray = NULL;
uint32 blockIndex = 0;
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
protobufBlockSkipList =
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
(uint8 *) buffer->data);
if (protobufBlockSkipList == NULL)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("invalid skip list buffer")));
}
if (protobufBlockSkipList->n_blockskipnodearray != blockCount)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("block skip node count and block count don't match")));
}
blockSkipNodeArray = palloc0(blockCount * sizeof(ColumnBlockSkipNode));
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode = NULL;
ColumnBlockSkipNode *blockSkipNode = NULL;
bool hasMinMax = false;
Datum minimumValue = 0;
Datum maximumValue = 0;
protobufBlockSkipNode = protobufBlockSkipList->blockskipnodearray[blockIndex];
if (!protobufBlockSkipNode->has_rowcount ||
!protobufBlockSkipNode->has_existsblockoffset ||
!protobufBlockSkipNode->has_valueblockoffset ||
!protobufBlockSkipNode->has_existslength ||
!protobufBlockSkipNode->has_valuelength ||
!protobufBlockSkipNode->has_valuecompressiontype)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("missing required block skip node metadata")));
}
if (protobufBlockSkipNode->has_minimumvalue !=
protobufBlockSkipNode->has_maximumvalue)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("has minimum and has maximum fields "
"don't match")));
}
hasMinMax = protobufBlockSkipNode->has_minimumvalue;
if (hasMinMax)
{
minimumValue = ProtobufBinaryToDatum(protobufBlockSkipNode->minimumvalue,
typeByValue, typeLength);
maximumValue = ProtobufBinaryToDatum(protobufBlockSkipNode->maximumvalue,
typeByValue, typeLength);
}
blockSkipNode = &blockSkipNodeArray[blockIndex];
blockSkipNode->rowCount = protobufBlockSkipNode->rowcount;
blockSkipNode->hasMinMax = hasMinMax;
blockSkipNode->minimumValue = minimumValue;
blockSkipNode->maximumValue = maximumValue;
blockSkipNode->existsBlockOffset = protobufBlockSkipNode->existsblockoffset;
blockSkipNode->valueBlockOffset = protobufBlockSkipNode->valueblockoffset;
blockSkipNode->existsLength = protobufBlockSkipNode->existslength;
blockSkipNode->valueLength = protobufBlockSkipNode->valuelength;
blockSkipNode->valueCompressionType =
(CompressionType) protobufBlockSkipNode->valuecompressiontype;
}
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
return blockSkipNodeArray;
}
/* Converts a datum to a ProtobufCBinaryData. */
static ProtobufCBinaryData
DatumToProtobufBinary(Datum datum, bool datumTypeByValue, int datumTypeLength)
{
ProtobufCBinaryData protobufBinary = { 0, 0 };
int datumLength = att_addlength_datum(0, datumTypeLength, datum);
char *datumBuffer = palloc0(datumLength);
if (datumTypeLength > 0)
{
if (datumTypeByValue)
{
store_att_byval(datumBuffer, datum, datumTypeLength);
}
else
{
memcpy(datumBuffer, DatumGetPointer(datum), datumTypeLength);
}
}
else
{
memcpy(datumBuffer, DatumGetPointer(datum), datumLength);
}
protobufBinary.data = (uint8 *) datumBuffer;
protobufBinary.len = datumLength;
return protobufBinary;
}
/* Converts the given ProtobufCBinaryData to a Datum. */
static Datum
ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary, bool datumTypeByValue,
int datumTypeLength)
{
Datum datum = 0;
/*
* We copy the protobuf data so the result of this function lives even
* after the unpacked protobuf struct is freed.
*/
char *binaryDataCopy = palloc0(protobufBinary.len);
memcpy(binaryDataCopy, protobufBinary.data, protobufBinary.len);
datum = fetch_att(binaryDataCopy, datumTypeByValue, datumTypeLength);
return datum;
}

View File

@ -1,31 +0,0 @@
/*-------------------------------------------------------------------------
*
* cstore_metadata_serialization.h
*
* Type and function declarations to serialize/deserialize cstore metadata.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#ifndef CSTORE_SERIALIZATION_H
#define CSTORE_SERIALIZATION_H
/* Function declarations for metadata serialization */
extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, bool typeByValue,
int typeLength);
/* Function declarations for metadata deserialization */
extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength);
extern uint32 DeserializeBlockCount(StringInfo buffer);
extern uint32 DeserializeRowCount(StringInfo buffer);
extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer,
bool typeByValue, int typeLength,
uint32 blockCount);
#endif /* CSTORE_SERIALIZATION_H */

View File

@ -31,13 +31,12 @@
#include "lib/stringinfo.h"
#include "port.h"
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "cstore_metadata_serialization.h"
typedef struct
{
Relation rel;
@ -50,6 +49,8 @@ static Oid CStoreStripesRelationId(void);
static Oid CStoreStripesIndexRelationId(void);
static Oid CStoreTablesRelationId(void);
static Oid CStoreTablesIndexRelationId(void);
static Oid CStoreSkipNodesRelationId(void);
static Oid CStoreSkipNodesIndexRelationId(void);
static Oid CStoreNamespaceId(void);
static int TableBlockRowCount(Oid relid);
static void DeleteTableMetadataRowIfExists(Oid relid);
@ -59,15 +60,16 @@ static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
static void FinishModifyRelation(ModifyState *state);
static EState * create_estate_for_relation(Relation rel);
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
/* constants for cstore_stripe_attr */
#define Natts_cstore_stripe_attr 6
#define Natts_cstore_stripe_attr 5
#define Anum_cstore_stripe_attr_relid 1
#define Anum_cstore_stripe_attr_stripe 2
#define Anum_cstore_stripe_attr_attr 3
#define Anum_cstore_stripe_attr_exists_size 4
#define Anum_cstore_stripe_attr_value_size 5
#define Anum_cstore_stripe_attr_skiplist_size 6
/* constants for cstore_table */
#define Natts_cstore_tables 4
@ -77,12 +79,29 @@ static EState * create_estate_for_relation(Relation rel);
#define Anum_cstore_tables_version_minor 4
/* constants for cstore_stripe */
#define Natts_cstore_stripes 5
#define Natts_cstore_stripes 6
#define Anum_cstore_stripes_relid 1
#define Anum_cstore_stripes_stripe 2
#define Anum_cstore_stripes_file_offset 3
#define Anum_cstore_stripes_skiplist_length 4
#define Anum_cstore_stripes_data_length 5
#define Anum_cstore_stripes_data_length 4
#define Anum_cstore_stripes_block_count 5
#define Anum_cstore_stripes_row_count 6
/* constants for cstore_skipnodes */
#define Natts_cstore_skipnodes 12
#define Anum_cstore_skipnodes_relid 1
#define Anum_cstore_skipnodes_stripe 2
#define Anum_cstore_skipnodes_attr 3
#define Anum_cstore_skipnodes_block 4
#define Anum_cstore_skipnodes_row_count 5
#define Anum_cstore_skipnodes_minimum_value 6
#define Anum_cstore_skipnodes_maximum_value 7
#define Anum_cstore_skipnodes_value_stream_offset 8
#define Anum_cstore_skipnodes_value_stream_length 9
#define Anum_cstore_skipnodes_exists_stream_offset 10
#define Anum_cstore_skipnodes_exists_stream_length 11
#define Anum_cstore_skipnodes_value_compression_type 12
/*
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
@ -117,6 +136,185 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
}
/*
* SaveStripeSkipList saves StripeSkipList for a given stripe as rows
* of cstore_skipnodes.
*/
void
SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor)
{
uint32 columnIndex = 0;
uint32 blockIndex = 0;
Oid cstoreSkipNodesOid = InvalidOid;
Relation cstoreSkipNodes = NULL;
ModifyState *modifyState = NULL;
uint32 columnCount = stripeSkipList->columnCount;
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, RowExclusiveLock);
modifyState = StartModifyRelation(cstoreSkipNodes);
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
{
ColumnBlockSkipNode *skipNode =
&stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
Datum values[Natts_cstore_skipnodes] = {
ObjectIdGetDatum(relid),
Int64GetDatum(stripe),
Int32GetDatum(columnIndex + 1),
Int32GetDatum(blockIndex),
Int64GetDatum(skipNode->rowCount),
0, /* to be filled below */
0, /* to be filled below */
Int64GetDatum(skipNode->valueBlockOffset),
Int64GetDatum(skipNode->valueLength),
Int64GetDatum(skipNode->existsBlockOffset),
Int64GetDatum(skipNode->existsLength),
Int32GetDatum(skipNode->valueCompressionType)
};
bool nulls[Natts_cstore_skipnodes] = { false };
if (skipNode->hasMinMax)
{
values[Anum_cstore_skipnodes_minimum_value - 1] =
PointerGetDatum(DatumToBytea(skipNode->minimumValue,
&tupleDescriptor->attrs[columnIndex]));
values[Anum_cstore_skipnodes_maximum_value - 1] =
PointerGetDatum(DatumToBytea(skipNode->maximumValue,
&tupleDescriptor->attrs[columnIndex]));
}
else
{
nulls[Anum_cstore_skipnodes_minimum_value - 1] = true;
nulls[Anum_cstore_skipnodes_maximum_value - 1] = true;
}
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
}
}
FinishModifyRelation(modifyState);
heap_close(cstoreSkipNodes, NoLock);
CommandCounterIncrement();
}
/*
* ReadStripeSkipList fetches StripeSkipList for a given stripe.
*/
StripeSkipList *
ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
uint32 blockCount)
{
StripeSkipList *skipList = NULL;
uint32 columnIndex = 0;
Oid cstoreSkipNodesOid = InvalidOid;
Relation cstoreSkipNodes = NULL;
Relation index = NULL;
HeapTuple heapTuple = NULL;
uint32 columnCount = tupleDescriptor->natts;
ScanKeyData scanKey[2];
SysScanDesc scanDescriptor = NULL;
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relid,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
scanDescriptor = systable_beginscan_ordered(cstoreSkipNodes, index, NULL, 2, scanKey);
skipList = palloc0(sizeof(StripeSkipList));
skipList->blockCount = blockCount;
skipList->columnCount = columnCount;
skipList->blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
skipList->blockSkipNodeArray[columnIndex] =
palloc0(blockCount * sizeof(ColumnBlockSkipNode));
}
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
uint32 attr = 0;
uint32 blockIndex = 0;
ColumnBlockSkipNode *skipNode = NULL;
Datum datumArray[Natts_cstore_skipnodes];
bool isNullArray[Natts_cstore_skipnodes];
heap_deform_tuple(heapTuple, RelationGetDescr(cstoreSkipNodes), datumArray,
isNullArray);
attr = DatumGetInt32(datumArray[Anum_cstore_skipnodes_attr - 1]);
blockIndex = DatumGetInt32(datumArray[Anum_cstore_skipnodes_block - 1]);
if (attr <= 0 || attr > columnCount)
{
ereport(ERROR, (errmsg("invalid stripe skipnode entry"),
errdetail("Attribute number out of range: %u", attr)));
}
if (blockIndex < 0 || blockIndex >= blockCount)
{
ereport(ERROR, (errmsg("invalid stripe skipnode entry"),
errdetail("Block number out of range: %u", blockIndex)));
}
columnIndex = attr - 1;
skipNode = &skipList->blockSkipNodeArray[columnIndex][blockIndex];
skipNode->rowCount = DatumGetInt64(datumArray[Anum_cstore_skipnodes_row_count -
1]);
skipNode->valueBlockOffset =
DatumGetInt64(datumArray[Anum_cstore_skipnodes_value_stream_offset - 1]);
skipNode->valueLength =
DatumGetInt64(datumArray[Anum_cstore_skipnodes_value_stream_length - 1]);
skipNode->existsBlockOffset =
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_offset - 1]);
skipNode->existsLength =
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_length - 1]);
skipNode->valueCompressionType =
DatumGetInt32(datumArray[Anum_cstore_skipnodes_value_compression_type - 1]);
if (isNullArray[Anum_cstore_skipnodes_minimum_value - 1] ||
isNullArray[Anum_cstore_skipnodes_maximum_value - 1])
{
skipNode->hasMinMax = false;
}
else
{
bytea *minValue = DatumGetByteaP(
datumArray[Anum_cstore_skipnodes_minimum_value - 1]);
bytea *maxValue = DatumGetByteaP(
datumArray[Anum_cstore_skipnodes_maximum_value - 1]);
skipNode->minimumValue =
ByteaToDatum(minValue, &tupleDescriptor->attrs[columnIndex]);
skipNode->maximumValue =
ByteaToDatum(maxValue, &tupleDescriptor->attrs[columnIndex]);
skipNode->hasMinMax = true;
}
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
heap_close(cstoreSkipNodes, NoLock);
return skipList;
}
/*
* InsertStripeMetadataRow adds a row to cstore_stripes.
*/
@ -128,8 +326,9 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
ObjectIdGetDatum(relid),
Int64GetDatum(stripe->id),
Int64GetDatum(stripe->fileOffset),
Int64GetDatum(stripe->skipListLength),
Int64GetDatum(stripe->dataLength)
Int64GetDatum(stripe->dataLength),
Int32GetDatum(stripe->blockCount),
Int64GetDatum(stripe->rowCount)
};
Oid cstoreStripesOid = CStoreStripesRelationId();
@ -187,8 +386,10 @@ ReadTableMetadata(Oid relid)
datumArray[Anum_cstore_stripes_file_offset - 1]);
stripeMetadata->dataLength = DatumGetInt64(
datumArray[Anum_cstore_stripes_data_length - 1]);
stripeMetadata->skipListLength = DatumGetInt64(
datumArray[Anum_cstore_stripes_skiplist_length - 1]);
stripeMetadata->blockCount = DatumGetInt32(
datumArray[Anum_cstore_stripes_block_count - 1]);
stripeMetadata->rowCount = DatumGetInt64(
datumArray[Anum_cstore_stripes_row_count - 1]);
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
stripeMetadata);
@ -299,8 +500,7 @@ SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
Int64GetDatum(stripe),
Int16GetDatum(attr),
Int64GetDatum(footer->existsSizeArray[attr - 1]),
Int64GetDatum(footer->valueSizeArray[attr - 1]),
Int64GetDatum(footer->skipListSizeArray[attr - 1])
Int64GetDatum(footer->valueSizeArray[attr - 1])
};
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
@ -339,7 +539,6 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
footer = palloc0(sizeof(StripeFooter));
footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64));
footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64));
footer->skipListSizeArray = palloc0(relationColumnCount * sizeof(int64));
/*
* Stripe can have less columns than the relation if ALTER TABLE happens
@ -369,8 +568,6 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]);
footer->valueSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]);
footer->skipListSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_skiplist_size - 1]);
}
systable_endscan_ordered(scanDescriptor);
@ -507,6 +704,55 @@ create_estate_for_relation(Relation rel)
}
/*
* DatumToBytea serializes a datum into a bytea value.
*/
static bytea *
DatumToBytea(Datum value, Form_pg_attribute attrForm)
{
int datumLength = att_addlength_datum(0, attrForm->attlen, value);
bytea *result = palloc0(datumLength + VARHDRSZ);
SET_VARSIZE(result, datumLength + VARHDRSZ);
if (attrForm->attlen > 0)
{
if (attrForm->attbyval)
{
store_att_byval(VARDATA(result), value, attrForm->attlen);
}
else
{
memcpy(VARDATA(result), DatumGetPointer(value), attrForm->attlen);
}
}
else
{
memcpy(VARDATA(result), DatumGetPointer(value), datumLength);
}
return result;
}
/*
* ByteaToDatum deserializes a value which was previously serialized using
* DatumToBytea.
*/
static Datum
ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm)
{
/*
* We copy the data so the result of this function lives even
* after the byteaDatum is freed.
*/
char *binaryDataCopy = palloc0(VARSIZE_ANY_EXHDR(bytes));
memcpy(binaryDataCopy, VARDATA_ANY(bytes), VARSIZE_ANY_EXHDR(bytes));
return fetch_att(binaryDataCopy, attrForm->attbyval, attrForm->attlen);
}
/*
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
* TODO: should we cache this similar to citus?
@ -573,6 +819,28 @@ CStoreTablesIndexRelationId(void)
}
/*
* CStoreSkipNodesRelationId returns relation id of cstore_skipnodes.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreSkipNodesRelationId(void)
{
return get_relname_relid("cstore_skipnodes", CStoreNamespaceId());
}
/*
* CStoreSkipNodesIndexRelationId returns relation id of cstore_skipnodes_pkey.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreSkipNodesIndexRelationId(void)
{
return get_relname_relid("cstore_skipnodes_pkey", CStoreNamespaceId());
}
/*
* CStoreNamespaceId returns namespace id of the schema we store cstore
* related tables.

View File

@ -34,7 +34,6 @@
#include "utils/rel.h"
#include "cstore.h"
#include "cstore_metadata_serialization.h"
#include "cstore_version_compat.h"
/* static function declarations */
@ -53,12 +52,6 @@ static ColumnBuffers * LoadColumnBuffers(Relation relation,
uint32 blockCount, uint64 existsFileOffset,
uint64 valueFileOffset,
Form_pg_attribute attributeForm);
static StripeSkipList * LoadStripeSkipList(Relation relation,
StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter,
uint32 columnCount,
bool *projectedColumnMask,
TupleDesc tupleDescriptor);
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
List *projectedColumnList, List *whereClauseList);
static List * BuildRestrictInfoList(List *whereClauseList);
@ -85,8 +78,6 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
uint32 columnCount);
static uint64 StripeRowCount(Relation relation, StripeMetadata *stripeMetadata);
static int RelationColumnCount(Oid relid);
/*
@ -327,34 +318,13 @@ CStoreTableRowCount(Relation relation)
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
{
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
totalRowCount += StripeRowCount(relation, stripeMetadata);
totalRowCount += stripeMetadata->rowCount;
}
return totalRowCount;
}
/*
* StripeRowCount reads serialized stripe footer, the first column's
* skip list, and returns number of rows for given stripe.
*/
static uint64
StripeRowCount(Relation relation, StripeMetadata *stripeMetadata)
{
uint64 rowCount = 0;
StringInfo firstColumnSkipListBuffer = NULL;
StripeFooter *stripeFooter = ReadStripeFooter(relation->rd_id, stripeMetadata->id,
RelationColumnCount(relation->rd_id));
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]);
rowCount = DeserializeRowCount(firstColumnSkipListBuffer);
return rowCount;
}
/*
* LoadFilteredStripeBuffers reads serialized stripe data from the given file.
* The function skips over blocks whose rows are refuted by restriction qualifiers,
@ -373,10 +343,10 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
StripeSkipList *stripeSkipList = LoadStripeSkipList(relation, stripeMetadata,
stripeFooter, columnCount,
projectedColumnMask,
tupleDescriptor);
StripeSkipList *stripeSkipList = ReadStripeSkipList(RelationGetRelid(relation),
stripeMetadata->id,
tupleDescriptor,
stripeMetadata->blockCount);
bool *selectedBlockMask = SelectedBlockMask(stripeSkipList, projectedColumnList,
whereClauseList);
@ -387,7 +357,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
/* load column data for projected columns */
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
currentColumnFileOffset = stripeMetadata->fileOffset + stripeMetadata->skipListLength;
currentColumnFileOffset = stripeMetadata->fileOffset;
for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++)
{
@ -511,98 +481,6 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
}
/* Reads the skip list for the given stripe. */
static StripeSkipList *
LoadStripeSkipList(Relation relation,
StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, uint32 columnCount,
bool *projectedColumnMask,
TupleDesc tupleDescriptor)
{
StripeSkipList *stripeSkipList = NULL;
ColumnBlockSkipNode **blockSkipNodeArray = NULL;
StringInfo firstColumnSkipListBuffer = NULL;
uint64 currentColumnSkipListFileOffset = 0;
uint32 columnIndex = 0;
uint32 stripeBlockCount = 0;
uint32 stripeColumnCount = stripeFooter->columnCount;
/* deserialize block count */
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]);
stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer);
/* deserialize column skip lists */
blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
currentColumnSkipListFileOffset = stripeMetadata->fileOffset;
for (columnIndex = 0; columnIndex < stripeColumnCount; columnIndex++)
{
uint64 columnSkipListSize = stripeFooter->skipListSizeArray[columnIndex];
bool firstColumn = columnIndex == 0;
/*
* Only selected columns' column skip lists are read. However, the first
* column's skip list is read regardless of being selected. It is used by
* StripeSkipListRowCount later.
*/
if (projectedColumnMask[columnIndex] || firstColumn)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
StringInfo columnSkipListBuffer =
ReadFromSmgr(relation, currentColumnSkipListFileOffset,
columnSkipListSize);
ColumnBlockSkipNode *columnSkipList =
DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval,
attributeForm->attlen, stripeBlockCount);
blockSkipNodeArray[columnIndex] = columnSkipList;
}
currentColumnSkipListFileOffset += columnSkipListSize;
}
/* table contains additional columns added after this stripe is created */
for (columnIndex = stripeColumnCount; columnIndex < columnCount; columnIndex++)
{
ColumnBlockSkipNode *columnSkipList = NULL;
uint32 blockIndex = 0;
bool firstColumn = columnIndex == 0;
/* no need to create ColumnBlockSkipList if the column is not selected */
if (!projectedColumnMask[columnIndex] && !firstColumn)
{
blockSkipNodeArray[columnIndex] = NULL;
continue;
}
/* create empty ColumnBlockSkipNode for missing columns*/
columnSkipList = palloc0(stripeBlockCount * sizeof(ColumnBlockSkipNode));
for (blockIndex = 0; blockIndex < stripeBlockCount; blockIndex++)
{
columnSkipList[blockIndex].rowCount = 0;
columnSkipList[blockIndex].hasMinMax = false;
columnSkipList[blockIndex].minimumValue = 0;
columnSkipList[blockIndex].maximumValue = 0;
columnSkipList[blockIndex].existsBlockOffset = 0;
columnSkipList[blockIndex].valueBlockOffset = 0;
columnSkipList[blockIndex].existsLength = 0;
columnSkipList[blockIndex].valueLength = 0;
columnSkipList[blockIndex].valueCompressionType = COMPRESSION_NONE;
}
blockSkipNodeArray[columnIndex] = columnSkipList;
}
stripeSkipList = palloc0(sizeof(StripeSkipList));
stripeSkipList->blockSkipNodeArray = blockSkipNodeArray;
stripeSkipList->columnCount = columnCount;
stripeSkipList->blockCount = stripeBlockCount;
return stripeSkipList;
}
/*
* SelectedBlockMask walks over each column's blocks and checks if a block can
* be filtered without reading its data. The filtering happens when all rows in
@ -1207,15 +1085,3 @@ ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount)
}
}
}
static int
RelationColumnCount(Oid relid)
{
Relation rel = RelationIdGetRelation(relid);
TupleDesc tupleDesc = RelationGetDescr(rel);
int columnCount = tupleDesc->natts;
RelationClose(rel);
return columnCount;
}

View File

@ -24,7 +24,6 @@
#include "utils/rel.h"
#include "cstore.h"
#include "cstore_metadata_serialization.h"
#include "cstore_version_compat.h"
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
@ -34,10 +33,7 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
uint32 blockRowCount,
uint32 columnCount);
static StripeMetadata FlushStripe(TableWriteState *writeState);
static StringInfo * CreateSkipListBufferArray(StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor);
static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList,
StringInfo *skipListBufferArray);
static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList);
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
bool datumTypeByValue, int datumTypeLength,
@ -90,9 +86,7 @@ CStoreBeginWrite(Oid relationId,
uint64 lastStripeSize = 0;
lastStripe = llast(tableMetadata->stripeMetadataList);
lastStripeSize += lastStripe->skipListLength;
lastStripeSize += lastStripe->dataLength;
lastStripeSize += lastStripe->footerLength;
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
currentStripeId = lastStripe->id + 1;
@ -429,10 +423,8 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
static StripeMetadata
FlushStripe(TableWriteState *writeState)
{
StripeMetadata stripeMetadata = { 0, 0, 0, 0 };
uint64 skipListLength = 0;
StripeMetadata stripeMetadata = { 0 };
uint64 dataLength = 0;
StringInfo *skipListBufferArray = NULL;
StripeFooter *stripeFooter = NULL;
uint32 columnIndex = 0;
uint32 blockIndex = 0;
@ -486,32 +478,21 @@ FlushStripe(TableWriteState *writeState)
}
/* create skip list and footer buffers */
skipListBufferArray = CreateSkipListBufferArray(stripeSkipList, tupleDescriptor);
stripeFooter = CreateStripeFooter(stripeSkipList, skipListBufferArray);
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
stripeSkipList, tupleDescriptor);
stripeFooter = CreateStripeFooter(stripeSkipList);
/*
* Each stripe has three sections:
* (1) Skip list, which contains statistics for each column block, and can
* be used to skip reading row blocks that are refuted by WHERE clause list,
* (2) Data section, in which we store data for each column continuously.
* Each stripe has only one section:
* Data section, in which we store data for each column continuously.
* We store data for each for each column in blocks. For each block, we
* store two buffers: "exists" buffer, and "value" buffer. "exists" buffer
* tells which values are not NULL. "value" buffer contains values for
* present values. For each column, we first store all "exists" buffers,
* and then all "value" buffers.
* (3) Stripe footer, which contains the skip list buffer size, exists buffer
* size, and value buffer size for each of the columns.
*
* We start by flushing the skip list buffers.
*/
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
StringInfo skipListBuffer = skipListBufferArray[columnIndex];
WriteToSmgr(writeState, skipListBuffer->data, skipListBuffer->len);
writeState->currentFileOffset += skipListBuffer->len;
}
/* then, we flush the data buffers */
/* flush the data buffers */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
@ -546,60 +527,32 @@ FlushStripe(TableWriteState *writeState)
/* set stripe metadata */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
skipListLength += stripeFooter->skipListSizeArray[columnIndex];
dataLength += stripeFooter->existsSizeArray[columnIndex];
dataLength += stripeFooter->valueSizeArray[columnIndex];
}
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
stripeMetadata.rowCount +=
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
}
stripeMetadata.fileOffset = initialFileOffset;
stripeMetadata.skipListLength = skipListLength;
stripeMetadata.dataLength = dataLength;
stripeMetadata.footerLength = 0;
stripeMetadata.id = writeState->currentStripeId;
stripeMetadata.blockCount = blockCount;
return stripeMetadata;
}
/*
* CreateSkipListBufferArray serializes the skip list for each column of the
* given stripe and returns the result as an array.
*/
static StringInfo *
CreateSkipListBufferArray(StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor)
{
StringInfo *skipListBufferArray = NULL;
uint32 columnIndex = 0;
uint32 columnCount = stripeSkipList->columnCount;
skipListBufferArray = palloc0(columnCount * sizeof(StringInfo));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
StringInfo skipListBuffer = NULL;
ColumnBlockSkipNode *blockSkipNodeArray =
stripeSkipList->blockSkipNodeArray[columnIndex];
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
skipListBuffer = SerializeColumnSkipList(blockSkipNodeArray,
stripeSkipList->blockCount,
attributeForm->attbyval,
attributeForm->attlen);
skipListBufferArray[columnIndex] = skipListBuffer;
}
return skipListBufferArray;
}
/* Creates and returns the footer for given stripe. */
static StripeFooter *
CreateStripeFooter(StripeSkipList *stripeSkipList, StringInfo *skipListBufferArray)
CreateStripeFooter(StripeSkipList *stripeSkipList)
{
StripeFooter *stripeFooter = NULL;
uint32 columnIndex = 0;
uint32 columnCount = stripeSkipList->columnCount;
uint64 *skipListSizeArray = palloc0(columnCount * sizeof(uint64));
uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64));
uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64));
@ -614,12 +567,10 @@ CreateStripeFooter(StripeSkipList *stripeSkipList, StringInfo *skipListBufferArr
existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength;
valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength;
}
skipListSizeArray[columnIndex] = skipListBufferArray[columnIndex]->len;
}
stripeFooter = palloc0(sizeof(StripeFooter));
stripeFooter->columnCount = columnCount;
stripeFooter->skipListSizeArray = skipListSizeArray;
stripeFooter->existsSizeArray = existsSizeArray;
stripeFooter->valueSizeArray = valueSizeArray;