mirror of https://github.com/citusdata/citus.git
Merge pull request #5 from citusdata/skiplist_to_metadata_tables
Move skipnodes to metadata tablesmerge-cstore-pykello
commit
c1cf3fe6e7
9
Makefile
9
Makefile
|
@ -6,10 +6,8 @@
|
||||||
MODULE_big = cstore_fdw
|
MODULE_big = cstore_fdw
|
||||||
|
|
||||||
PG_CPPFLAGS = -std=c11
|
PG_CPPFLAGS = -std=c11
|
||||||
SHLIB_LINK = -lprotobuf-c
|
OBJS = cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
|
||||||
OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
|
cstore_compression.o mod.o cstore_metadata_tables.o
|
||||||
cstore_metadata_serialization.o cstore_compression.o mod.o \
|
|
||||||
cstore_metadata_tables.o
|
|
||||||
|
|
||||||
EXTENSION = cstore_fdw
|
EXTENSION = cstore_fdw
|
||||||
DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \
|
DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \
|
||||||
|
@ -51,9 +49,6 @@ ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5 9.6 10 11 12))
|
||||||
$(error PostgreSQL 9.3 to 12 is required to compile this extension)
|
$(error PostgreSQL 9.3 to 12 is required to compile this extension)
|
||||||
endif
|
endif
|
||||||
|
|
||||||
cstore.pb-c.c: cstore.proto
|
|
||||||
protoc-c --c_out=. cstore.proto
|
|
||||||
|
|
||||||
installcheck: remove_cstore_files
|
installcheck: remove_cstore_files
|
||||||
|
|
||||||
remove_cstore_files:
|
remove_cstore_files:
|
||||||
|
|
10
cstore.h
10
cstore.h
|
@ -81,9 +81,9 @@ typedef struct CStoreOptions
|
||||||
typedef struct StripeMetadata
|
typedef struct StripeMetadata
|
||||||
{
|
{
|
||||||
uint64 fileOffset;
|
uint64 fileOffset;
|
||||||
uint64 skipListLength;
|
|
||||||
uint64 dataLength;
|
uint64 dataLength;
|
||||||
uint64 footerLength;
|
uint32 blockCount;
|
||||||
|
uint64 rowCount;
|
||||||
uint64 id;
|
uint64 id;
|
||||||
} StripeMetadata;
|
} StripeMetadata;
|
||||||
|
|
||||||
|
@ -191,7 +191,6 @@ typedef struct StripeBuffers
|
||||||
typedef struct StripeFooter
|
typedef struct StripeFooter
|
||||||
{
|
{
|
||||||
uint32 columnCount;
|
uint32 columnCount;
|
||||||
uint64 *skipListSizeArray;
|
|
||||||
uint64 *existsSizeArray;
|
uint64 *existsSizeArray;
|
||||||
uint64 *valueSizeArray;
|
uint64 *valueSizeArray;
|
||||||
} StripeFooter;
|
} StripeFooter;
|
||||||
|
@ -293,6 +292,11 @@ extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationCol
|
||||||
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
||||||
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
||||||
extern TableMetadata * ReadTableMetadata(Oid relid);
|
extern TableMetadata * ReadTableMetadata(Oid relid);
|
||||||
|
extern void SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||||
|
TupleDesc tupleDescriptor);
|
||||||
|
extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe,
|
||||||
|
TupleDesc tupleDescriptor,
|
||||||
|
uint32 blockCount);
|
||||||
|
|
||||||
typedef struct SmgrAddr
|
typedef struct SmgrAddr
|
||||||
{
|
{
|
||||||
|
|
24
cstore.proto
24
cstore.proto
|
@ -1,24 +0,0 @@
|
||||||
syntax = "proto2";
|
|
||||||
|
|
||||||
package protobuf;
|
|
||||||
|
|
||||||
enum CompressionType {
|
|
||||||
// Values should match with the corresponding struct in cstore_fdw.h
|
|
||||||
NONE = 0;
|
|
||||||
PG_LZ = 1;
|
|
||||||
};
|
|
||||||
|
|
||||||
message ColumnBlockSkipNode {
|
|
||||||
optional uint64 rowCount = 1;
|
|
||||||
optional bytes minimumValue = 2;
|
|
||||||
optional bytes maximumValue = 3;
|
|
||||||
optional uint64 valueBlockOffset = 4;
|
|
||||||
optional uint64 valueLength = 5;
|
|
||||||
optional CompressionType valueCompressionType = 6;
|
|
||||||
optional uint64 existsBlockOffset = 7;
|
|
||||||
optional uint64 existsLength = 8;
|
|
||||||
}
|
|
||||||
|
|
||||||
message ColumnBlockSkipList {
|
|
||||||
repeated ColumnBlockSkipNode blockSkipNodeArray = 1;
|
|
||||||
}
|
|
|
@ -68,23 +68,48 @@ CREATE TABLE cstore.cstore_tables (
|
||||||
PRIMARY KEY (relid)
|
PRIMARY KEY (relid)
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
|
COMMENT ON TABLE cstore.cstore_tables IS 'CStore table wide metadata';
|
||||||
|
|
||||||
CREATE TABLE cstore.cstore_stripes (
|
CREATE TABLE cstore.cstore_stripes (
|
||||||
relid oid NOT NULL,
|
relid oid NOT NULL,
|
||||||
stripe bigint NOT NULL,
|
stripe bigint NOT NULL,
|
||||||
file_offset bigint NOT NULL,
|
file_offset bigint NOT NULL,
|
||||||
skiplist_length bigint NOT NULL,
|
|
||||||
data_length bigint NOT NULL,
|
data_length bigint NOT NULL,
|
||||||
|
block_count int NOT NULL,
|
||||||
|
row_count bigint NOT NULL,
|
||||||
PRIMARY KEY (relid, stripe),
|
PRIMARY KEY (relid, stripe),
|
||||||
FOREIGN KEY (relid) REFERENCES cstore.cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
|
FOREIGN KEY (relid) REFERENCES cstore.cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
|
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per stripe metadata';
|
||||||
|
|
||||||
CREATE TABLE cstore.cstore_stripe_attr (
|
CREATE TABLE cstore.cstore_stripe_attr (
|
||||||
relid oid NOT NULL,
|
relid oid NOT NULL,
|
||||||
stripe bigint NOT NULL,
|
stripe bigint NOT NULL,
|
||||||
attr int NOT NULL,
|
attr int NOT NULL,
|
||||||
exists_size bigint NOT NULL,
|
exists_size bigint NOT NULL,
|
||||||
value_size bigint NOT NULL,
|
value_size bigint NOT NULL,
|
||||||
skiplist_size bigint NOT NULL,
|
|
||||||
PRIMARY KEY (relid, stripe, attr),
|
PRIMARY KEY (relid, stripe, attr),
|
||||||
FOREIGN KEY (relid, stripe) REFERENCES cstore.cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
FOREIGN KEY (relid, stripe) REFERENCES cstore.cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
|
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per stripe/column combination metadata';
|
||||||
|
|
||||||
|
CREATE TABLE cstore.cstore_skipnodes (
|
||||||
|
relid oid NOT NULL,
|
||||||
|
stripe bigint NOT NULL,
|
||||||
|
attr int NOT NULL,
|
||||||
|
block int NOT NULL,
|
||||||
|
row_count bigint NOT NULL,
|
||||||
|
minimum_value bytea,
|
||||||
|
maximum_value bytea,
|
||||||
|
value_stream_offset bigint NOT NULL,
|
||||||
|
value_stream_length bigint NOT NULL,
|
||||||
|
exists_stream_offset bigint NOT NULL,
|
||||||
|
exists_stream_length bigint NOT NULL,
|
||||||
|
value_compression_type int NOT NULL,
|
||||||
|
PRIMARY KEY (relid, stripe, attr, block),
|
||||||
|
FOREIGN KEY (relid, stripe, attr) REFERENCES cstore.cstore_stripe_attr(relid, stripe, attr) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
|
COMMENT ON TABLE cstore.cstore_tables IS 'CStore per block metadata';
|
||||||
|
|
|
@ -1,302 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* cstore_metadata_serialization.c
|
|
||||||
*
|
|
||||||
* This file contains function definitions for serializing/deserializing cstore
|
|
||||||
* metadata.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2016, Citus Data, Inc.
|
|
||||||
*
|
|
||||||
* $Id$
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
#include "postgres.h"
|
|
||||||
|
|
||||||
#include "access/tupmacs.h"
|
|
||||||
|
|
||||||
#include "cstore.h"
|
|
||||||
#include "cstore_metadata_serialization.h"
|
|
||||||
#include "cstore.pb-c.h"
|
|
||||||
|
|
||||||
/* local functions forward declarations */
|
|
||||||
static ProtobufCBinaryData DatumToProtobufBinary(Datum datum, bool typeByValue,
|
|
||||||
int typeLength);
|
|
||||||
static Datum ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary,
|
|
||||||
bool typeByValue, int typeLength);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SerializeColumnSkipList serializes a column skip list, where the colum skip
|
|
||||||
* list includes all block skip nodes for that column. The function then returns
|
|
||||||
* the result as a string info.
|
|
||||||
*/
|
|
||||||
StringInfo
|
|
||||||
SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCount,
|
|
||||||
bool typeByValue, int typeLength)
|
|
||||||
{
|
|
||||||
StringInfo blockSkipListBuffer = NULL;
|
|
||||||
Protobuf__ColumnBlockSkipList protobufBlockSkipList =
|
|
||||||
PROTOBUF__COLUMN_BLOCK_SKIP_LIST__INIT;
|
|
||||||
Protobuf__ColumnBlockSkipNode **protobufBlockSkipNodeArray = NULL;
|
|
||||||
uint32 blockIndex = 0;
|
|
||||||
uint8 *blockSkipListData = NULL;
|
|
||||||
uint32 blockSkipListSize = 0;
|
|
||||||
|
|
||||||
protobufBlockSkipNodeArray = palloc0(blockCount *
|
|
||||||
sizeof(Protobuf__ColumnBlockSkipNode *));
|
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
|
||||||
{
|
|
||||||
ColumnBlockSkipNode blockSkipNode = blockSkipNodeArray[blockIndex];
|
|
||||||
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode = NULL;
|
|
||||||
ProtobufCBinaryData binaryMinimumValue = { 0, 0 };
|
|
||||||
ProtobufCBinaryData binaryMaximumValue = { 0, 0 };
|
|
||||||
|
|
||||||
if (blockSkipNode.hasMinMax)
|
|
||||||
{
|
|
||||||
binaryMinimumValue = DatumToProtobufBinary(blockSkipNode.minimumValue,
|
|
||||||
typeByValue, typeLength);
|
|
||||||
binaryMaximumValue = DatumToProtobufBinary(blockSkipNode.maximumValue,
|
|
||||||
typeByValue, typeLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
protobufBlockSkipNode = palloc0(sizeof(Protobuf__ColumnBlockSkipNode));
|
|
||||||
protobuf__column_block_skip_node__init(protobufBlockSkipNode);
|
|
||||||
protobufBlockSkipNode->has_rowcount = true;
|
|
||||||
protobufBlockSkipNode->rowcount = blockSkipNode.rowCount;
|
|
||||||
protobufBlockSkipNode->has_minimumvalue = blockSkipNode.hasMinMax;
|
|
||||||
protobufBlockSkipNode->minimumvalue = binaryMinimumValue;
|
|
||||||
protobufBlockSkipNode->has_maximumvalue = blockSkipNode.hasMinMax;
|
|
||||||
protobufBlockSkipNode->maximumvalue = binaryMaximumValue;
|
|
||||||
protobufBlockSkipNode->has_valueblockoffset = true;
|
|
||||||
protobufBlockSkipNode->valueblockoffset = blockSkipNode.valueBlockOffset;
|
|
||||||
protobufBlockSkipNode->has_valuelength = true;
|
|
||||||
protobufBlockSkipNode->valuelength = blockSkipNode.valueLength;
|
|
||||||
protobufBlockSkipNode->has_existsblockoffset = true;
|
|
||||||
protobufBlockSkipNode->existsblockoffset = blockSkipNode.existsBlockOffset;
|
|
||||||
protobufBlockSkipNode->has_existslength = true;
|
|
||||||
protobufBlockSkipNode->existslength = blockSkipNode.existsLength;
|
|
||||||
protobufBlockSkipNode->has_valuecompressiontype = true;
|
|
||||||
protobufBlockSkipNode->valuecompressiontype =
|
|
||||||
(Protobuf__CompressionType) blockSkipNode.valueCompressionType;
|
|
||||||
|
|
||||||
protobufBlockSkipNodeArray[blockIndex] = protobufBlockSkipNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
protobufBlockSkipList.n_blockskipnodearray = blockCount;
|
|
||||||
protobufBlockSkipList.blockskipnodearray = protobufBlockSkipNodeArray;
|
|
||||||
|
|
||||||
blockSkipListSize =
|
|
||||||
protobuf__column_block_skip_list__get_packed_size(&protobufBlockSkipList);
|
|
||||||
blockSkipListData = palloc0(blockSkipListSize);
|
|
||||||
protobuf__column_block_skip_list__pack(&protobufBlockSkipList, blockSkipListData);
|
|
||||||
|
|
||||||
blockSkipListBuffer = palloc0(sizeof(StringInfoData));
|
|
||||||
blockSkipListBuffer->len = blockSkipListSize;
|
|
||||||
blockSkipListBuffer->maxlen = blockSkipListSize;
|
|
||||||
blockSkipListBuffer->data = (char *) blockSkipListData;
|
|
||||||
|
|
||||||
return blockSkipListBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeserializeBlockCount deserializes the given column skip list buffer and
|
|
||||||
* returns the number of blocks in column skip list.
|
|
||||||
*/
|
|
||||||
uint32
|
|
||||||
DeserializeBlockCount(StringInfo buffer)
|
|
||||||
{
|
|
||||||
uint32 blockCount = 0;
|
|
||||||
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
|
|
||||||
|
|
||||||
protobufBlockSkipList =
|
|
||||||
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
|
|
||||||
(uint8 *) buffer->data);
|
|
||||||
if (protobufBlockSkipList == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid skip list buffer")));
|
|
||||||
}
|
|
||||||
|
|
||||||
blockCount = protobufBlockSkipList->n_blockskipnodearray;
|
|
||||||
|
|
||||||
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
|
|
||||||
|
|
||||||
return blockCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeserializeRowCount deserializes the given column skip list buffer and
|
|
||||||
* returns the total number of rows in block skip list.
|
|
||||||
*/
|
|
||||||
uint32
|
|
||||||
DeserializeRowCount(StringInfo buffer)
|
|
||||||
{
|
|
||||||
uint32 rowCount = 0;
|
|
||||||
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
|
|
||||||
uint32 blockIndex = 0;
|
|
||||||
uint32 blockCount = 0;
|
|
||||||
|
|
||||||
protobufBlockSkipList =
|
|
||||||
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
|
|
||||||
(uint8 *) buffer->data);
|
|
||||||
if (protobufBlockSkipList == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid skip list buffer")));
|
|
||||||
}
|
|
||||||
|
|
||||||
blockCount = (uint32) protobufBlockSkipList->n_blockskipnodearray;
|
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
|
||||||
{
|
|
||||||
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode =
|
|
||||||
protobufBlockSkipList->blockskipnodearray[blockIndex];
|
|
||||||
rowCount += protobufBlockSkipNode->rowcount;
|
|
||||||
}
|
|
||||||
|
|
||||||
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
|
|
||||||
|
|
||||||
return rowCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeserializeColumnSkipList deserializes the given buffer and returns the result as
|
|
||||||
* a ColumnBlockSkipNode array. If the number of unpacked block skip nodes are not
|
|
||||||
* equal to the given block count function errors out.
|
|
||||||
*/
|
|
||||||
ColumnBlockSkipNode *
|
|
||||||
DeserializeColumnSkipList(StringInfo buffer, bool typeByValue, int typeLength,
|
|
||||||
uint32 blockCount)
|
|
||||||
{
|
|
||||||
ColumnBlockSkipNode *blockSkipNodeArray = NULL;
|
|
||||||
uint32 blockIndex = 0;
|
|
||||||
Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL;
|
|
||||||
|
|
||||||
protobufBlockSkipList =
|
|
||||||
protobuf__column_block_skip_list__unpack(NULL, buffer->len,
|
|
||||||
(uint8 *) buffer->data);
|
|
||||||
if (protobufBlockSkipList == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid skip list buffer")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (protobufBlockSkipList->n_blockskipnodearray != blockCount)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("block skip node count and block count don't match")));
|
|
||||||
}
|
|
||||||
|
|
||||||
blockSkipNodeArray = palloc0(blockCount * sizeof(ColumnBlockSkipNode));
|
|
||||||
|
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
|
||||||
{
|
|
||||||
Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode = NULL;
|
|
||||||
ColumnBlockSkipNode *blockSkipNode = NULL;
|
|
||||||
bool hasMinMax = false;
|
|
||||||
Datum minimumValue = 0;
|
|
||||||
Datum maximumValue = 0;
|
|
||||||
|
|
||||||
protobufBlockSkipNode = protobufBlockSkipList->blockskipnodearray[blockIndex];
|
|
||||||
if (!protobufBlockSkipNode->has_rowcount ||
|
|
||||||
!protobufBlockSkipNode->has_existsblockoffset ||
|
|
||||||
!protobufBlockSkipNode->has_valueblockoffset ||
|
|
||||||
!protobufBlockSkipNode->has_existslength ||
|
|
||||||
!protobufBlockSkipNode->has_valuelength ||
|
|
||||||
!protobufBlockSkipNode->has_valuecompressiontype)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("missing required block skip node metadata")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (protobufBlockSkipNode->has_minimumvalue !=
|
|
||||||
protobufBlockSkipNode->has_maximumvalue)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("has minimum and has maximum fields "
|
|
||||||
"don't match")));
|
|
||||||
}
|
|
||||||
|
|
||||||
hasMinMax = protobufBlockSkipNode->has_minimumvalue;
|
|
||||||
if (hasMinMax)
|
|
||||||
{
|
|
||||||
minimumValue = ProtobufBinaryToDatum(protobufBlockSkipNode->minimumvalue,
|
|
||||||
typeByValue, typeLength);
|
|
||||||
maximumValue = ProtobufBinaryToDatum(protobufBlockSkipNode->maximumvalue,
|
|
||||||
typeByValue, typeLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
blockSkipNode = &blockSkipNodeArray[blockIndex];
|
|
||||||
blockSkipNode->rowCount = protobufBlockSkipNode->rowcount;
|
|
||||||
blockSkipNode->hasMinMax = hasMinMax;
|
|
||||||
blockSkipNode->minimumValue = minimumValue;
|
|
||||||
blockSkipNode->maximumValue = maximumValue;
|
|
||||||
blockSkipNode->existsBlockOffset = protobufBlockSkipNode->existsblockoffset;
|
|
||||||
blockSkipNode->valueBlockOffset = protobufBlockSkipNode->valueblockoffset;
|
|
||||||
blockSkipNode->existsLength = protobufBlockSkipNode->existslength;
|
|
||||||
blockSkipNode->valueLength = protobufBlockSkipNode->valuelength;
|
|
||||||
blockSkipNode->valueCompressionType =
|
|
||||||
(CompressionType) protobufBlockSkipNode->valuecompressiontype;
|
|
||||||
}
|
|
||||||
|
|
||||||
protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL);
|
|
||||||
|
|
||||||
return blockSkipNodeArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Converts a datum to a ProtobufCBinaryData. */
|
|
||||||
static ProtobufCBinaryData
|
|
||||||
DatumToProtobufBinary(Datum datum, bool datumTypeByValue, int datumTypeLength)
|
|
||||||
{
|
|
||||||
ProtobufCBinaryData protobufBinary = { 0, 0 };
|
|
||||||
|
|
||||||
int datumLength = att_addlength_datum(0, datumTypeLength, datum);
|
|
||||||
char *datumBuffer = palloc0(datumLength);
|
|
||||||
|
|
||||||
if (datumTypeLength > 0)
|
|
||||||
{
|
|
||||||
if (datumTypeByValue)
|
|
||||||
{
|
|
||||||
store_att_byval(datumBuffer, datum, datumTypeLength);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
memcpy(datumBuffer, DatumGetPointer(datum), datumTypeLength);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
memcpy(datumBuffer, DatumGetPointer(datum), datumLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
protobufBinary.data = (uint8 *) datumBuffer;
|
|
||||||
protobufBinary.len = datumLength;
|
|
||||||
|
|
||||||
return protobufBinary;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Converts the given ProtobufCBinaryData to a Datum. */
|
|
||||||
static Datum
|
|
||||||
ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary, bool datumTypeByValue,
|
|
||||||
int datumTypeLength)
|
|
||||||
{
|
|
||||||
Datum datum = 0;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We copy the protobuf data so the result of this function lives even
|
|
||||||
* after the unpacked protobuf struct is freed.
|
|
||||||
*/
|
|
||||||
char *binaryDataCopy = palloc0(protobufBinary.len);
|
|
||||||
memcpy(binaryDataCopy, protobufBinary.data, protobufBinary.len);
|
|
||||||
|
|
||||||
datum = fetch_att(binaryDataCopy, datumTypeByValue, datumTypeLength);
|
|
||||||
|
|
||||||
return datum;
|
|
||||||
}
|
|
|
@ -1,31 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* cstore_metadata_serialization.h
|
|
||||||
*
|
|
||||||
* Type and function declarations to serialize/deserialize cstore metadata.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2016, Citus Data, Inc.
|
|
||||||
*
|
|
||||||
* $Id$
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef CSTORE_SERIALIZATION_H
|
|
||||||
#define CSTORE_SERIALIZATION_H
|
|
||||||
|
|
||||||
/* Function declarations for metadata serialization */
|
|
||||||
extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray,
|
|
||||||
uint32 blockCount, bool typeByValue,
|
|
||||||
int typeLength);
|
|
||||||
|
|
||||||
/* Function declarations for metadata deserialization */
|
|
||||||
extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength);
|
|
||||||
extern uint32 DeserializeBlockCount(StringInfo buffer);
|
|
||||||
extern uint32 DeserializeRowCount(StringInfo buffer);
|
|
||||||
extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer,
|
|
||||||
bool typeByValue, int typeLength,
|
|
||||||
uint32 blockCount);
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* CSTORE_SERIALIZATION_H */
|
|
|
@ -31,13 +31,12 @@
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "port.h"
|
#include "port.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
#include "cstore_metadata_serialization.h"
|
|
||||||
|
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
Relation rel;
|
Relation rel;
|
||||||
|
@ -50,6 +49,8 @@ static Oid CStoreStripesRelationId(void);
|
||||||
static Oid CStoreStripesIndexRelationId(void);
|
static Oid CStoreStripesIndexRelationId(void);
|
||||||
static Oid CStoreTablesRelationId(void);
|
static Oid CStoreTablesRelationId(void);
|
||||||
static Oid CStoreTablesIndexRelationId(void);
|
static Oid CStoreTablesIndexRelationId(void);
|
||||||
|
static Oid CStoreSkipNodesRelationId(void);
|
||||||
|
static Oid CStoreSkipNodesIndexRelationId(void);
|
||||||
static Oid CStoreNamespaceId(void);
|
static Oid CStoreNamespaceId(void);
|
||||||
static int TableBlockRowCount(Oid relid);
|
static int TableBlockRowCount(Oid relid);
|
||||||
static void DeleteTableMetadataRowIfExists(Oid relid);
|
static void DeleteTableMetadataRowIfExists(Oid relid);
|
||||||
|
@ -59,15 +60,16 @@ static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||||
static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
|
static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
|
||||||
static void FinishModifyRelation(ModifyState *state);
|
static void FinishModifyRelation(ModifyState *state);
|
||||||
static EState * create_estate_for_relation(Relation rel);
|
static EState * create_estate_for_relation(Relation rel);
|
||||||
|
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
|
||||||
|
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||||
|
|
||||||
/* constants for cstore_stripe_attr */
|
/* constants for cstore_stripe_attr */
|
||||||
#define Natts_cstore_stripe_attr 6
|
#define Natts_cstore_stripe_attr 5
|
||||||
#define Anum_cstore_stripe_attr_relid 1
|
#define Anum_cstore_stripe_attr_relid 1
|
||||||
#define Anum_cstore_stripe_attr_stripe 2
|
#define Anum_cstore_stripe_attr_stripe 2
|
||||||
#define Anum_cstore_stripe_attr_attr 3
|
#define Anum_cstore_stripe_attr_attr 3
|
||||||
#define Anum_cstore_stripe_attr_exists_size 4
|
#define Anum_cstore_stripe_attr_exists_size 4
|
||||||
#define Anum_cstore_stripe_attr_value_size 5
|
#define Anum_cstore_stripe_attr_value_size 5
|
||||||
#define Anum_cstore_stripe_attr_skiplist_size 6
|
|
||||||
|
|
||||||
/* constants for cstore_table */
|
/* constants for cstore_table */
|
||||||
#define Natts_cstore_tables 4
|
#define Natts_cstore_tables 4
|
||||||
|
@ -77,12 +79,29 @@ static EState * create_estate_for_relation(Relation rel);
|
||||||
#define Anum_cstore_tables_version_minor 4
|
#define Anum_cstore_tables_version_minor 4
|
||||||
|
|
||||||
/* constants for cstore_stripe */
|
/* constants for cstore_stripe */
|
||||||
#define Natts_cstore_stripes 5
|
#define Natts_cstore_stripes 6
|
||||||
#define Anum_cstore_stripes_relid 1
|
#define Anum_cstore_stripes_relid 1
|
||||||
#define Anum_cstore_stripes_stripe 2
|
#define Anum_cstore_stripes_stripe 2
|
||||||
#define Anum_cstore_stripes_file_offset 3
|
#define Anum_cstore_stripes_file_offset 3
|
||||||
#define Anum_cstore_stripes_skiplist_length 4
|
#define Anum_cstore_stripes_data_length 4
|
||||||
#define Anum_cstore_stripes_data_length 5
|
#define Anum_cstore_stripes_block_count 5
|
||||||
|
#define Anum_cstore_stripes_row_count 6
|
||||||
|
|
||||||
|
/* constants for cstore_skipnodes */
|
||||||
|
#define Natts_cstore_skipnodes 12
|
||||||
|
#define Anum_cstore_skipnodes_relid 1
|
||||||
|
#define Anum_cstore_skipnodes_stripe 2
|
||||||
|
#define Anum_cstore_skipnodes_attr 3
|
||||||
|
#define Anum_cstore_skipnodes_block 4
|
||||||
|
#define Anum_cstore_skipnodes_row_count 5
|
||||||
|
#define Anum_cstore_skipnodes_minimum_value 6
|
||||||
|
#define Anum_cstore_skipnodes_maximum_value 7
|
||||||
|
#define Anum_cstore_skipnodes_value_stream_offset 8
|
||||||
|
#define Anum_cstore_skipnodes_value_stream_length 9
|
||||||
|
#define Anum_cstore_skipnodes_exists_stream_offset 10
|
||||||
|
#define Anum_cstore_skipnodes_exists_stream_length 11
|
||||||
|
#define Anum_cstore_skipnodes_value_compression_type 12
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
|
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
|
||||||
|
@ -117,6 +136,185 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SaveStripeSkipList saves StripeSkipList for a given stripe as rows
|
||||||
|
* of cstore_skipnodes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||||
|
TupleDesc tupleDescriptor)
|
||||||
|
{
|
||||||
|
uint32 columnIndex = 0;
|
||||||
|
uint32 blockIndex = 0;
|
||||||
|
Oid cstoreSkipNodesOid = InvalidOid;
|
||||||
|
Relation cstoreSkipNodes = NULL;
|
||||||
|
ModifyState *modifyState = NULL;
|
||||||
|
uint32 columnCount = stripeSkipList->columnCount;
|
||||||
|
|
||||||
|
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||||
|
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, RowExclusiveLock);
|
||||||
|
modifyState = StartModifyRelation(cstoreSkipNodes);
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
|
||||||
|
{
|
||||||
|
ColumnBlockSkipNode *skipNode =
|
||||||
|
&stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||||
|
|
||||||
|
Datum values[Natts_cstore_skipnodes] = {
|
||||||
|
ObjectIdGetDatum(relid),
|
||||||
|
Int64GetDatum(stripe),
|
||||||
|
Int32GetDatum(columnIndex + 1),
|
||||||
|
Int32GetDatum(blockIndex),
|
||||||
|
Int64GetDatum(skipNode->rowCount),
|
||||||
|
0, /* to be filled below */
|
||||||
|
0, /* to be filled below */
|
||||||
|
Int64GetDatum(skipNode->valueBlockOffset),
|
||||||
|
Int64GetDatum(skipNode->valueLength),
|
||||||
|
Int64GetDatum(skipNode->existsBlockOffset),
|
||||||
|
Int64GetDatum(skipNode->existsLength),
|
||||||
|
Int32GetDatum(skipNode->valueCompressionType)
|
||||||
|
};
|
||||||
|
|
||||||
|
bool nulls[Natts_cstore_skipnodes] = { false };
|
||||||
|
|
||||||
|
if (skipNode->hasMinMax)
|
||||||
|
{
|
||||||
|
values[Anum_cstore_skipnodes_minimum_value - 1] =
|
||||||
|
PointerGetDatum(DatumToBytea(skipNode->minimumValue,
|
||||||
|
&tupleDescriptor->attrs[columnIndex]));
|
||||||
|
values[Anum_cstore_skipnodes_maximum_value - 1] =
|
||||||
|
PointerGetDatum(DatumToBytea(skipNode->maximumValue,
|
||||||
|
&tupleDescriptor->attrs[columnIndex]));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nulls[Anum_cstore_skipnodes_minimum_value - 1] = true;
|
||||||
|
nulls[Anum_cstore_skipnodes_maximum_value - 1] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishModifyRelation(modifyState);
|
||||||
|
heap_close(cstoreSkipNodes, NoLock);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReadStripeSkipList fetches StripeSkipList for a given stripe.
|
||||||
|
*/
|
||||||
|
StripeSkipList *
|
||||||
|
ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
||||||
|
uint32 blockCount)
|
||||||
|
{
|
||||||
|
StripeSkipList *skipList = NULL;
|
||||||
|
uint32 columnIndex = 0;
|
||||||
|
Oid cstoreSkipNodesOid = InvalidOid;
|
||||||
|
Relation cstoreSkipNodes = NULL;
|
||||||
|
Relation index = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
uint32 columnCount = tupleDescriptor->natts;
|
||||||
|
ScanKeyData scanKey[2];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
|
||||||
|
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||||
|
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
||||||
|
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||||
|
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreSkipNodes, index, NULL, 2, scanKey);
|
||||||
|
|
||||||
|
skipList = palloc0(sizeof(StripeSkipList));
|
||||||
|
skipList->blockCount = blockCount;
|
||||||
|
skipList->columnCount = columnCount;
|
||||||
|
skipList->blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
skipList->blockSkipNodeArray[columnIndex] =
|
||||||
|
palloc0(blockCount * sizeof(ColumnBlockSkipNode));
|
||||||
|
}
|
||||||
|
|
||||||
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
|
{
|
||||||
|
uint32 attr = 0;
|
||||||
|
uint32 blockIndex = 0;
|
||||||
|
ColumnBlockSkipNode *skipNode = NULL;
|
||||||
|
|
||||||
|
Datum datumArray[Natts_cstore_skipnodes];
|
||||||
|
bool isNullArray[Natts_cstore_skipnodes];
|
||||||
|
|
||||||
|
heap_deform_tuple(heapTuple, RelationGetDescr(cstoreSkipNodes), datumArray,
|
||||||
|
isNullArray);
|
||||||
|
|
||||||
|
attr = DatumGetInt32(datumArray[Anum_cstore_skipnodes_attr - 1]);
|
||||||
|
blockIndex = DatumGetInt32(datumArray[Anum_cstore_skipnodes_block - 1]);
|
||||||
|
|
||||||
|
if (attr <= 0 || attr > columnCount)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("invalid stripe skipnode entry"),
|
||||||
|
errdetail("Attribute number out of range: %u", attr)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blockIndex < 0 || blockIndex >= blockCount)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("invalid stripe skipnode entry"),
|
||||||
|
errdetail("Block number out of range: %u", blockIndex)));
|
||||||
|
}
|
||||||
|
|
||||||
|
columnIndex = attr - 1;
|
||||||
|
|
||||||
|
skipNode = &skipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||||
|
skipNode->rowCount = DatumGetInt64(datumArray[Anum_cstore_skipnodes_row_count -
|
||||||
|
1]);
|
||||||
|
skipNode->valueBlockOffset =
|
||||||
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_value_stream_offset - 1]);
|
||||||
|
skipNode->valueLength =
|
||||||
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_value_stream_length - 1]);
|
||||||
|
skipNode->existsBlockOffset =
|
||||||
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_offset - 1]);
|
||||||
|
skipNode->existsLength =
|
||||||
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_length - 1]);
|
||||||
|
skipNode->valueCompressionType =
|
||||||
|
DatumGetInt32(datumArray[Anum_cstore_skipnodes_value_compression_type - 1]);
|
||||||
|
|
||||||
|
if (isNullArray[Anum_cstore_skipnodes_minimum_value - 1] ||
|
||||||
|
isNullArray[Anum_cstore_skipnodes_maximum_value - 1])
|
||||||
|
{
|
||||||
|
skipNode->hasMinMax = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bytea *minValue = DatumGetByteaP(
|
||||||
|
datumArray[Anum_cstore_skipnodes_minimum_value - 1]);
|
||||||
|
bytea *maxValue = DatumGetByteaP(
|
||||||
|
datumArray[Anum_cstore_skipnodes_maximum_value - 1]);
|
||||||
|
|
||||||
|
skipNode->minimumValue =
|
||||||
|
ByteaToDatum(minValue, &tupleDescriptor->attrs[columnIndex]);
|
||||||
|
skipNode->maximumValue =
|
||||||
|
ByteaToDatum(maxValue, &tupleDescriptor->attrs[columnIndex]);
|
||||||
|
|
||||||
|
skipNode->hasMinMax = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(index, NoLock);
|
||||||
|
heap_close(cstoreSkipNodes, NoLock);
|
||||||
|
|
||||||
|
return skipList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
||||||
*/
|
*/
|
||||||
|
@ -128,8 +326,9 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
||||||
ObjectIdGetDatum(relid),
|
ObjectIdGetDatum(relid),
|
||||||
Int64GetDatum(stripe->id),
|
Int64GetDatum(stripe->id),
|
||||||
Int64GetDatum(stripe->fileOffset),
|
Int64GetDatum(stripe->fileOffset),
|
||||||
Int64GetDatum(stripe->skipListLength),
|
Int64GetDatum(stripe->dataLength),
|
||||||
Int64GetDatum(stripe->dataLength)
|
Int32GetDatum(stripe->blockCount),
|
||||||
|
Int64GetDatum(stripe->rowCount)
|
||||||
};
|
};
|
||||||
|
|
||||||
Oid cstoreStripesOid = CStoreStripesRelationId();
|
Oid cstoreStripesOid = CStoreStripesRelationId();
|
||||||
|
@ -187,8 +386,10 @@ ReadTableMetadata(Oid relid)
|
||||||
datumArray[Anum_cstore_stripes_file_offset - 1]);
|
datumArray[Anum_cstore_stripes_file_offset - 1]);
|
||||||
stripeMetadata->dataLength = DatumGetInt64(
|
stripeMetadata->dataLength = DatumGetInt64(
|
||||||
datumArray[Anum_cstore_stripes_data_length - 1]);
|
datumArray[Anum_cstore_stripes_data_length - 1]);
|
||||||
stripeMetadata->skipListLength = DatumGetInt64(
|
stripeMetadata->blockCount = DatumGetInt32(
|
||||||
datumArray[Anum_cstore_stripes_skiplist_length - 1]);
|
datumArray[Anum_cstore_stripes_block_count - 1]);
|
||||||
|
stripeMetadata->rowCount = DatumGetInt64(
|
||||||
|
datumArray[Anum_cstore_stripes_row_count - 1]);
|
||||||
|
|
||||||
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
||||||
stripeMetadata);
|
stripeMetadata);
|
||||||
|
@ -299,8 +500,7 @@ SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
|
||||||
Int64GetDatum(stripe),
|
Int64GetDatum(stripe),
|
||||||
Int16GetDatum(attr),
|
Int16GetDatum(attr),
|
||||||
Int64GetDatum(footer->existsSizeArray[attr - 1]),
|
Int64GetDatum(footer->existsSizeArray[attr - 1]),
|
||||||
Int64GetDatum(footer->valueSizeArray[attr - 1]),
|
Int64GetDatum(footer->valueSizeArray[attr - 1])
|
||||||
Int64GetDatum(footer->skipListSizeArray[attr - 1])
|
|
||||||
};
|
};
|
||||||
|
|
||||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
|
@ -339,7 +539,6 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
|
||||||
footer = palloc0(sizeof(StripeFooter));
|
footer = palloc0(sizeof(StripeFooter));
|
||||||
footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64));
|
footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64));
|
||||||
footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64));
|
footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64));
|
||||||
footer->skipListSizeArray = palloc0(relationColumnCount * sizeof(int64));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Stripe can have less columns than the relation if ALTER TABLE happens
|
* Stripe can have less columns than the relation if ALTER TABLE happens
|
||||||
|
@ -369,8 +568,6 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
|
||||||
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]);
|
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]);
|
||||||
footer->valueSizeArray[attr - 1] =
|
footer->valueSizeArray[attr - 1] =
|
||||||
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]);
|
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]);
|
||||||
footer->skipListSizeArray[attr - 1] =
|
|
||||||
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_skiplist_size - 1]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
@ -507,6 +704,55 @@ create_estate_for_relation(Relation rel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DatumToBytea serializes a datum into a bytea value.
|
||||||
|
*/
|
||||||
|
static bytea *
|
||||||
|
DatumToBytea(Datum value, Form_pg_attribute attrForm)
|
||||||
|
{
|
||||||
|
int datumLength = att_addlength_datum(0, attrForm->attlen, value);
|
||||||
|
bytea *result = palloc0(datumLength + VARHDRSZ);
|
||||||
|
|
||||||
|
SET_VARSIZE(result, datumLength + VARHDRSZ);
|
||||||
|
|
||||||
|
if (attrForm->attlen > 0)
|
||||||
|
{
|
||||||
|
if (attrForm->attbyval)
|
||||||
|
{
|
||||||
|
store_att_byval(VARDATA(result), value, attrForm->attlen);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memcpy(VARDATA(result), DatumGetPointer(value), attrForm->attlen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memcpy(VARDATA(result), DatumGetPointer(value), datumLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ByteaToDatum deserializes a value which was previously serialized using
|
||||||
|
* DatumToBytea.
|
||||||
|
*/
|
||||||
|
static Datum
|
||||||
|
ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We copy the data so the result of this function lives even
|
||||||
|
* after the byteaDatum is freed.
|
||||||
|
*/
|
||||||
|
char *binaryDataCopy = palloc0(VARSIZE_ANY_EXHDR(bytes));
|
||||||
|
memcpy(binaryDataCopy, VARDATA_ANY(bytes), VARSIZE_ANY_EXHDR(bytes));
|
||||||
|
|
||||||
|
return fetch_att(binaryDataCopy, attrForm->attbyval, attrForm->attlen);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
|
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
|
||||||
* TODO: should we cache this similar to citus?
|
* TODO: should we cache this similar to citus?
|
||||||
|
@ -573,6 +819,28 @@ CStoreTablesIndexRelationId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreSkipNodesRelationId returns relation id of cstore_skipnodes.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreSkipNodesRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_skipnodes", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreSkipNodesIndexRelationId returns relation id of cstore_skipnodes_pkey.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreSkipNodesIndexRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_skipnodes_pkey", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreNamespaceId returns namespace id of the schema we store cstore
|
* CStoreNamespaceId returns namespace id of the schema we store cstore
|
||||||
* related tables.
|
* related tables.
|
||||||
|
|
146
cstore_reader.c
146
cstore_reader.c
|
@ -34,7 +34,6 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
#include "cstore.h"
|
#include "cstore.h"
|
||||||
#include "cstore_metadata_serialization.h"
|
|
||||||
#include "cstore_version_compat.h"
|
#include "cstore_version_compat.h"
|
||||||
|
|
||||||
/* static function declarations */
|
/* static function declarations */
|
||||||
|
@ -53,12 +52,6 @@ static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||||
uint32 blockCount, uint64 existsFileOffset,
|
uint32 blockCount, uint64 existsFileOffset,
|
||||||
uint64 valueFileOffset,
|
uint64 valueFileOffset,
|
||||||
Form_pg_attribute attributeForm);
|
Form_pg_attribute attributeForm);
|
||||||
static StripeSkipList * LoadStripeSkipList(Relation relation,
|
|
||||||
StripeMetadata *stripeMetadata,
|
|
||||||
StripeFooter *stripeFooter,
|
|
||||||
uint32 columnCount,
|
|
||||||
bool *projectedColumnMask,
|
|
||||||
TupleDesc tupleDescriptor);
|
|
||||||
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
|
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
|
||||||
List *projectedColumnList, List *whereClauseList);
|
List *projectedColumnList, List *whereClauseList);
|
||||||
static List * BuildRestrictInfoList(List *whereClauseList);
|
static List * BuildRestrictInfoList(List *whereClauseList);
|
||||||
|
@ -85,8 +78,6 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
||||||
static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
||||||
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
|
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
|
||||||
uint32 columnCount);
|
uint32 columnCount);
|
||||||
static uint64 StripeRowCount(Relation relation, StripeMetadata *stripeMetadata);
|
|
||||||
static int RelationColumnCount(Oid relid);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -327,34 +318,13 @@ CStoreTableRowCount(Relation relation)
|
||||||
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
||||||
totalRowCount += StripeRowCount(relation, stripeMetadata);
|
totalRowCount += stripeMetadata->rowCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalRowCount;
|
return totalRowCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* StripeRowCount reads serialized stripe footer, the first column's
|
|
||||||
* skip list, and returns number of rows for given stripe.
|
|
||||||
*/
|
|
||||||
static uint64
|
|
||||||
StripeRowCount(Relation relation, StripeMetadata *stripeMetadata)
|
|
||||||
{
|
|
||||||
uint64 rowCount = 0;
|
|
||||||
StringInfo firstColumnSkipListBuffer = NULL;
|
|
||||||
|
|
||||||
StripeFooter *stripeFooter = ReadStripeFooter(relation->rd_id, stripeMetadata->id,
|
|
||||||
RelationColumnCount(relation->rd_id));
|
|
||||||
|
|
||||||
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
|
|
||||||
stripeFooter->skipListSizeArray[0]);
|
|
||||||
rowCount = DeserializeRowCount(firstColumnSkipListBuffer);
|
|
||||||
|
|
||||||
return rowCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LoadFilteredStripeBuffers reads serialized stripe data from the given file.
|
* LoadFilteredStripeBuffers reads serialized stripe data from the given file.
|
||||||
* The function skips over blocks whose rows are refuted by restriction qualifiers,
|
* The function skips over blocks whose rows are refuted by restriction qualifiers,
|
||||||
|
@ -373,10 +343,10 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
|
|
||||||
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||||
|
|
||||||
StripeSkipList *stripeSkipList = LoadStripeSkipList(relation, stripeMetadata,
|
StripeSkipList *stripeSkipList = ReadStripeSkipList(RelationGetRelid(relation),
|
||||||
stripeFooter, columnCount,
|
stripeMetadata->id,
|
||||||
projectedColumnMask,
|
tupleDescriptor,
|
||||||
tupleDescriptor);
|
stripeMetadata->blockCount);
|
||||||
|
|
||||||
bool *selectedBlockMask = SelectedBlockMask(stripeSkipList, projectedColumnList,
|
bool *selectedBlockMask = SelectedBlockMask(stripeSkipList, projectedColumnList,
|
||||||
whereClauseList);
|
whereClauseList);
|
||||||
|
@ -387,7 +357,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
|
|
||||||
/* load column data for projected columns */
|
/* load column data for projected columns */
|
||||||
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||||
currentColumnFileOffset = stripeMetadata->fileOffset + stripeMetadata->skipListLength;
|
currentColumnFileOffset = stripeMetadata->fileOffset;
|
||||||
|
|
||||||
for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
|
@ -511,98 +481,6 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Reads the skip list for the given stripe. */
|
|
||||||
static StripeSkipList *
|
|
||||||
LoadStripeSkipList(Relation relation,
|
|
||||||
StripeMetadata *stripeMetadata,
|
|
||||||
StripeFooter *stripeFooter, uint32 columnCount,
|
|
||||||
bool *projectedColumnMask,
|
|
||||||
TupleDesc tupleDescriptor)
|
|
||||||
{
|
|
||||||
StripeSkipList *stripeSkipList = NULL;
|
|
||||||
ColumnBlockSkipNode **blockSkipNodeArray = NULL;
|
|
||||||
StringInfo firstColumnSkipListBuffer = NULL;
|
|
||||||
uint64 currentColumnSkipListFileOffset = 0;
|
|
||||||
uint32 columnIndex = 0;
|
|
||||||
uint32 stripeBlockCount = 0;
|
|
||||||
uint32 stripeColumnCount = stripeFooter->columnCount;
|
|
||||||
|
|
||||||
/* deserialize block count */
|
|
||||||
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
|
|
||||||
stripeFooter->skipListSizeArray[0]);
|
|
||||||
stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer);
|
|
||||||
|
|
||||||
/* deserialize column skip lists */
|
|
||||||
blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
|
|
||||||
currentColumnSkipListFileOffset = stripeMetadata->fileOffset;
|
|
||||||
|
|
||||||
for (columnIndex = 0; columnIndex < stripeColumnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
uint64 columnSkipListSize = stripeFooter->skipListSizeArray[columnIndex];
|
|
||||||
bool firstColumn = columnIndex == 0;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Only selected columns' column skip lists are read. However, the first
|
|
||||||
* column's skip list is read regardless of being selected. It is used by
|
|
||||||
* StripeSkipListRowCount later.
|
|
||||||
*/
|
|
||||||
if (projectedColumnMask[columnIndex] || firstColumn)
|
|
||||||
{
|
|
||||||
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
|
|
||||||
|
|
||||||
StringInfo columnSkipListBuffer =
|
|
||||||
ReadFromSmgr(relation, currentColumnSkipListFileOffset,
|
|
||||||
columnSkipListSize);
|
|
||||||
ColumnBlockSkipNode *columnSkipList =
|
|
||||||
DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval,
|
|
||||||
attributeForm->attlen, stripeBlockCount);
|
|
||||||
blockSkipNodeArray[columnIndex] = columnSkipList;
|
|
||||||
}
|
|
||||||
|
|
||||||
currentColumnSkipListFileOffset += columnSkipListSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* table contains additional columns added after this stripe is created */
|
|
||||||
for (columnIndex = stripeColumnCount; columnIndex < columnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
ColumnBlockSkipNode *columnSkipList = NULL;
|
|
||||||
uint32 blockIndex = 0;
|
|
||||||
bool firstColumn = columnIndex == 0;
|
|
||||||
|
|
||||||
/* no need to create ColumnBlockSkipList if the column is not selected */
|
|
||||||
if (!projectedColumnMask[columnIndex] && !firstColumn)
|
|
||||||
{
|
|
||||||
blockSkipNodeArray[columnIndex] = NULL;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* create empty ColumnBlockSkipNode for missing columns*/
|
|
||||||
columnSkipList = palloc0(stripeBlockCount * sizeof(ColumnBlockSkipNode));
|
|
||||||
|
|
||||||
for (blockIndex = 0; blockIndex < stripeBlockCount; blockIndex++)
|
|
||||||
{
|
|
||||||
columnSkipList[blockIndex].rowCount = 0;
|
|
||||||
columnSkipList[blockIndex].hasMinMax = false;
|
|
||||||
columnSkipList[blockIndex].minimumValue = 0;
|
|
||||||
columnSkipList[blockIndex].maximumValue = 0;
|
|
||||||
columnSkipList[blockIndex].existsBlockOffset = 0;
|
|
||||||
columnSkipList[blockIndex].valueBlockOffset = 0;
|
|
||||||
columnSkipList[blockIndex].existsLength = 0;
|
|
||||||
columnSkipList[blockIndex].valueLength = 0;
|
|
||||||
columnSkipList[blockIndex].valueCompressionType = COMPRESSION_NONE;
|
|
||||||
}
|
|
||||||
blockSkipNodeArray[columnIndex] = columnSkipList;
|
|
||||||
}
|
|
||||||
|
|
||||||
stripeSkipList = palloc0(sizeof(StripeSkipList));
|
|
||||||
stripeSkipList->blockSkipNodeArray = blockSkipNodeArray;
|
|
||||||
stripeSkipList->columnCount = columnCount;
|
|
||||||
stripeSkipList->blockCount = stripeBlockCount;
|
|
||||||
|
|
||||||
return stripeSkipList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SelectedBlockMask walks over each column's blocks and checks if a block can
|
* SelectedBlockMask walks over each column's blocks and checks if a block can
|
||||||
* be filtered without reading its data. The filtering happens when all rows in
|
* be filtered without reading its data. The filtering happens when all rows in
|
||||||
|
@ -1207,15 +1085,3 @@ ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
|
||||||
RelationColumnCount(Oid relid)
|
|
||||||
{
|
|
||||||
Relation rel = RelationIdGetRelation(relid);
|
|
||||||
TupleDesc tupleDesc = RelationGetDescr(rel);
|
|
||||||
int columnCount = tupleDesc->natts;
|
|
||||||
RelationClose(rel);
|
|
||||||
|
|
||||||
return columnCount;
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,7 +24,6 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
#include "cstore.h"
|
#include "cstore.h"
|
||||||
#include "cstore_metadata_serialization.h"
|
|
||||||
#include "cstore_version_compat.h"
|
#include "cstore_version_compat.h"
|
||||||
|
|
||||||
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
||||||
|
@ -34,10 +33,7 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
|
||||||
uint32 blockRowCount,
|
uint32 blockRowCount,
|
||||||
uint32 columnCount);
|
uint32 columnCount);
|
||||||
static StripeMetadata FlushStripe(TableWriteState *writeState);
|
static StripeMetadata FlushStripe(TableWriteState *writeState);
|
||||||
static StringInfo * CreateSkipListBufferArray(StripeSkipList *stripeSkipList,
|
static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList);
|
||||||
TupleDesc tupleDescriptor);
|
|
||||||
static StripeFooter * CreateStripeFooter(StripeSkipList *stripeSkipList,
|
|
||||||
StringInfo *skipListBufferArray);
|
|
||||||
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
|
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
|
||||||
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
|
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
|
||||||
bool datumTypeByValue, int datumTypeLength,
|
bool datumTypeByValue, int datumTypeLength,
|
||||||
|
@ -90,9 +86,7 @@ CStoreBeginWrite(Oid relationId,
|
||||||
uint64 lastStripeSize = 0;
|
uint64 lastStripeSize = 0;
|
||||||
|
|
||||||
lastStripe = llast(tableMetadata->stripeMetadataList);
|
lastStripe = llast(tableMetadata->stripeMetadataList);
|
||||||
lastStripeSize += lastStripe->skipListLength;
|
|
||||||
lastStripeSize += lastStripe->dataLength;
|
lastStripeSize += lastStripe->dataLength;
|
||||||
lastStripeSize += lastStripe->footerLength;
|
|
||||||
|
|
||||||
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
||||||
currentStripeId = lastStripe->id + 1;
|
currentStripeId = lastStripe->id + 1;
|
||||||
|
@ -429,10 +423,8 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||||
static StripeMetadata
|
static StripeMetadata
|
||||||
FlushStripe(TableWriteState *writeState)
|
FlushStripe(TableWriteState *writeState)
|
||||||
{
|
{
|
||||||
StripeMetadata stripeMetadata = { 0, 0, 0, 0 };
|
StripeMetadata stripeMetadata = { 0 };
|
||||||
uint64 skipListLength = 0;
|
|
||||||
uint64 dataLength = 0;
|
uint64 dataLength = 0;
|
||||||
StringInfo *skipListBufferArray = NULL;
|
|
||||||
StripeFooter *stripeFooter = NULL;
|
StripeFooter *stripeFooter = NULL;
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
uint32 blockIndex = 0;
|
uint32 blockIndex = 0;
|
||||||
|
@ -486,32 +478,21 @@ FlushStripe(TableWriteState *writeState)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create skip list and footer buffers */
|
/* create skip list and footer buffers */
|
||||||
skipListBufferArray = CreateSkipListBufferArray(stripeSkipList, tupleDescriptor);
|
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
|
||||||
stripeFooter = CreateStripeFooter(stripeSkipList, skipListBufferArray);
|
stripeSkipList, tupleDescriptor);
|
||||||
|
stripeFooter = CreateStripeFooter(stripeSkipList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Each stripe has three sections:
|
* Each stripe has only one section:
|
||||||
* (1) Skip list, which contains statistics for each column block, and can
|
* Data section, in which we store data for each column continuously.
|
||||||
* be used to skip reading row blocks that are refuted by WHERE clause list,
|
|
||||||
* (2) Data section, in which we store data for each column continuously.
|
|
||||||
* We store data for each for each column in blocks. For each block, we
|
* We store data for each for each column in blocks. For each block, we
|
||||||
* store two buffers: "exists" buffer, and "value" buffer. "exists" buffer
|
* store two buffers: "exists" buffer, and "value" buffer. "exists" buffer
|
||||||
* tells which values are not NULL. "value" buffer contains values for
|
* tells which values are not NULL. "value" buffer contains values for
|
||||||
* present values. For each column, we first store all "exists" buffers,
|
* present values. For each column, we first store all "exists" buffers,
|
||||||
* and then all "value" buffers.
|
* and then all "value" buffers.
|
||||||
* (3) Stripe footer, which contains the skip list buffer size, exists buffer
|
|
||||||
* size, and value buffer size for each of the columns.
|
|
||||||
*
|
|
||||||
* We start by flushing the skip list buffers.
|
|
||||||
*/
|
*/
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
StringInfo skipListBuffer = skipListBufferArray[columnIndex];
|
|
||||||
WriteToSmgr(writeState, skipListBuffer->data, skipListBuffer->len);
|
|
||||||
writeState->currentFileOffset += skipListBuffer->len;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* then, we flush the data buffers */
|
/* flush the data buffers */
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
||||||
|
@ -546,60 +527,32 @@ FlushStripe(TableWriteState *writeState)
|
||||||
/* set stripe metadata */
|
/* set stripe metadata */
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
skipListLength += stripeFooter->skipListSizeArray[columnIndex];
|
|
||||||
dataLength += stripeFooter->existsSizeArray[columnIndex];
|
dataLength += stripeFooter->existsSizeArray[columnIndex];
|
||||||
dataLength += stripeFooter->valueSizeArray[columnIndex];
|
dataLength += stripeFooter->valueSizeArray[columnIndex];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||||
|
{
|
||||||
|
stripeMetadata.rowCount +=
|
||||||
|
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
|
||||||
|
}
|
||||||
|
|
||||||
stripeMetadata.fileOffset = initialFileOffset;
|
stripeMetadata.fileOffset = initialFileOffset;
|
||||||
stripeMetadata.skipListLength = skipListLength;
|
|
||||||
stripeMetadata.dataLength = dataLength;
|
stripeMetadata.dataLength = dataLength;
|
||||||
stripeMetadata.footerLength = 0;
|
|
||||||
stripeMetadata.id = writeState->currentStripeId;
|
stripeMetadata.id = writeState->currentStripeId;
|
||||||
|
stripeMetadata.blockCount = blockCount;
|
||||||
|
|
||||||
return stripeMetadata;
|
return stripeMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CreateSkipListBufferArray serializes the skip list for each column of the
|
|
||||||
* given stripe and returns the result as an array.
|
|
||||||
*/
|
|
||||||
static StringInfo *
|
|
||||||
CreateSkipListBufferArray(StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor)
|
|
||||||
{
|
|
||||||
StringInfo *skipListBufferArray = NULL;
|
|
||||||
uint32 columnIndex = 0;
|
|
||||||
uint32 columnCount = stripeSkipList->columnCount;
|
|
||||||
|
|
||||||
skipListBufferArray = palloc0(columnCount * sizeof(StringInfo));
|
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
StringInfo skipListBuffer = NULL;
|
|
||||||
ColumnBlockSkipNode *blockSkipNodeArray =
|
|
||||||
stripeSkipList->blockSkipNodeArray[columnIndex];
|
|
||||||
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
|
|
||||||
|
|
||||||
skipListBuffer = SerializeColumnSkipList(blockSkipNodeArray,
|
|
||||||
stripeSkipList->blockCount,
|
|
||||||
attributeForm->attbyval,
|
|
||||||
attributeForm->attlen);
|
|
||||||
|
|
||||||
skipListBufferArray[columnIndex] = skipListBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
return skipListBufferArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Creates and returns the footer for given stripe. */
|
/* Creates and returns the footer for given stripe. */
|
||||||
static StripeFooter *
|
static StripeFooter *
|
||||||
CreateStripeFooter(StripeSkipList *stripeSkipList, StringInfo *skipListBufferArray)
|
CreateStripeFooter(StripeSkipList *stripeSkipList)
|
||||||
{
|
{
|
||||||
StripeFooter *stripeFooter = NULL;
|
StripeFooter *stripeFooter = NULL;
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
uint32 columnCount = stripeSkipList->columnCount;
|
uint32 columnCount = stripeSkipList->columnCount;
|
||||||
uint64 *skipListSizeArray = palloc0(columnCount * sizeof(uint64));
|
|
||||||
uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64));
|
uint64 *existsSizeArray = palloc0(columnCount * sizeof(uint64));
|
||||||
uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64));
|
uint64 *valueSizeArray = palloc0(columnCount * sizeof(uint64));
|
||||||
|
|
||||||
|
@ -614,12 +567,10 @@ CreateStripeFooter(StripeSkipList *stripeSkipList, StringInfo *skipListBufferArr
|
||||||
existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength;
|
existsSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].existsLength;
|
||||||
valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength;
|
valueSizeArray[columnIndex] += blockSkipNodeArray[blockIndex].valueLength;
|
||||||
}
|
}
|
||||||
skipListSizeArray[columnIndex] = skipListBufferArray[columnIndex]->len;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stripeFooter = palloc0(sizeof(StripeFooter));
|
stripeFooter = palloc0(sizeof(StripeFooter));
|
||||||
stripeFooter->columnCount = columnCount;
|
stripeFooter->columnCount = columnCount;
|
||||||
stripeFooter->skipListSizeArray = skipListSizeArray;
|
|
||||||
stripeFooter->existsSizeArray = existsSizeArray;
|
stripeFooter->existsSizeArray = existsSizeArray;
|
||||||
stripeFooter->valueSizeArray = valueSizeArray;
|
stripeFooter->valueSizeArray = valueSizeArray;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue