mirror of https://github.com/citusdata/citus.git
Columnar: track decompressed length in metadata
parent
d4f5d4a27b
commit
01da2a1c73
|
@ -137,7 +137,7 @@ typedef FormData_cstore_options *Form_cstore_options;
|
||||||
#define Anum_cstore_stripes_row_count 8
|
#define Anum_cstore_stripes_row_count 8
|
||||||
|
|
||||||
/* constants for cstore_skipnodes */
|
/* constants for cstore_skipnodes */
|
||||||
#define Natts_cstore_skipnodes 12
|
#define Natts_cstore_skipnodes 13
|
||||||
#define Anum_cstore_skipnodes_storageid 1
|
#define Anum_cstore_skipnodes_storageid 1
|
||||||
#define Anum_cstore_skipnodes_stripe 2
|
#define Anum_cstore_skipnodes_stripe 2
|
||||||
#define Anum_cstore_skipnodes_attr 3
|
#define Anum_cstore_skipnodes_attr 3
|
||||||
|
@ -150,6 +150,7 @@ typedef FormData_cstore_options *Form_cstore_options;
|
||||||
#define Anum_cstore_skipnodes_exists_stream_offset 10
|
#define Anum_cstore_skipnodes_exists_stream_offset 10
|
||||||
#define Anum_cstore_skipnodes_exists_stream_length 11
|
#define Anum_cstore_skipnodes_exists_stream_length 11
|
||||||
#define Anum_cstore_skipnodes_value_compression_type 12
|
#define Anum_cstore_skipnodes_value_compression_type 12
|
||||||
|
#define Anum_cstore_skipnodes_value_decompressed_size 13
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -416,7 +417,8 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *strip
|
||||||
Int64GetDatum(skipNode->valueLength),
|
Int64GetDatum(skipNode->valueLength),
|
||||||
Int64GetDatum(skipNode->existsChunkOffset),
|
Int64GetDatum(skipNode->existsChunkOffset),
|
||||||
Int64GetDatum(skipNode->existsLength),
|
Int64GetDatum(skipNode->existsLength),
|
||||||
Int32GetDatum(skipNode->valueCompressionType)
|
Int32GetDatum(skipNode->valueCompressionType),
|
||||||
|
Int64GetDatum(skipNode->decompressedValueSize)
|
||||||
};
|
};
|
||||||
|
|
||||||
bool nulls[Natts_cstore_skipnodes] = { false };
|
bool nulls[Natts_cstore_skipnodes] = { false };
|
||||||
|
@ -522,6 +524,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
||||||
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_length - 1]);
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_exists_stream_length - 1]);
|
||||||
skipNode->valueCompressionType =
|
skipNode->valueCompressionType =
|
||||||
DatumGetInt32(datumArray[Anum_cstore_skipnodes_value_compression_type - 1]);
|
DatumGetInt32(datumArray[Anum_cstore_skipnodes_value_compression_type - 1]);
|
||||||
|
skipNode->decompressedValueSize =
|
||||||
|
DatumGetInt64(datumArray[Anum_cstore_skipnodes_value_decompressed_size - 1]);
|
||||||
|
|
||||||
if (isNullArray[Anum_cstore_skipnodes_minimum_value - 1] ||
|
if (isNullArray[Anum_cstore_skipnodes_minimum_value - 1] ||
|
||||||
isNullArray[Anum_cstore_skipnodes_maximum_value - 1])
|
isNullArray[Anum_cstore_skipnodes_maximum_value - 1])
|
||||||
|
|
|
@ -694,6 +694,7 @@ LogRelationStats(Relation rel, int elevel)
|
||||||
uint64 chunkCount = 0;
|
uint64 chunkCount = 0;
|
||||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||||
uint64 droppedChunksWithData = 0;
|
uint64 droppedChunksWithData = 0;
|
||||||
|
uint64 totalDecompressedLength = 0;
|
||||||
|
|
||||||
List *stripeList = StripesForRelfilenode(relfilenode);
|
List *stripeList = StripesForRelfilenode(relfilenode);
|
||||||
int stripeCount = list_length(stripeList);
|
int stripeCount = list_length(stripeList);
|
||||||
|
@ -723,6 +724,13 @@ LogRelationStats(Relation rel, int elevel)
|
||||||
droppedChunksWithData++;
|
droppedChunksWithData++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't compress exists buffer, so its compressed & decompressed
|
||||||
|
* lengths are the same.
|
||||||
|
*/
|
||||||
|
totalDecompressedLength += skipnode->existsLength;
|
||||||
|
totalDecompressedLength += skipnode->decompressedValueSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -737,9 +745,14 @@ LogRelationStats(Relation rel, int elevel)
|
||||||
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
||||||
ObjectIdGetDatum(RelationGetRelid(rel)));
|
ObjectIdGetDatum(RelationGetRelid(rel)));
|
||||||
|
|
||||||
|
double compressionRate = totalStripeLength ?
|
||||||
|
(double) totalDecompressedLength / totalStripeLength :
|
||||||
|
1.0;
|
||||||
|
|
||||||
appendStringInfo(infoBuf, "storage id: %ld\n", DatumGetInt64(storageId));
|
appendStringInfo(infoBuf, "storage id: %ld\n", DatumGetInt64(storageId));
|
||||||
appendStringInfo(infoBuf, "total file size: %ld, total data size: %ld\n",
|
appendStringInfo(infoBuf, "total file size: %ld, total data size: %ld\n",
|
||||||
relPages * BLCKSZ, totalStripeLength);
|
relPages * BLCKSZ, totalStripeLength);
|
||||||
|
appendStringInfo(infoBuf, "compression rate: %.2fx\n", compressionRate);
|
||||||
appendStringInfo(infoBuf,
|
appendStringInfo(infoBuf,
|
||||||
"total row count: %ld, stripe count: %d, "
|
"total row count: %ld, stripe count: %d, "
|
||||||
"average rows per stripe: %ld\n",
|
"average rows per stripe: %ld\n",
|
||||||
|
|
|
@ -463,6 +463,7 @@ FlushStripe(TableWriteState *writeState)
|
||||||
chunkSkipNode->valueChunkOffset = stripeSize;
|
chunkSkipNode->valueChunkOffset = stripeSize;
|
||||||
chunkSkipNode->valueLength = valueBufferSize;
|
chunkSkipNode->valueLength = valueBufferSize;
|
||||||
chunkSkipNode->valueCompressionType = valueCompressionType;
|
chunkSkipNode->valueCompressionType = valueCompressionType;
|
||||||
|
chunkSkipNode->decompressedValueSize = chunkBuffers->decompressedValueSize;
|
||||||
|
|
||||||
stripeSize += valueBufferSize;
|
stripeSize += valueBufferSize;
|
||||||
}
|
}
|
||||||
|
@ -631,9 +632,11 @@ SerializeChunkData(TableWriteState *writeState, uint32 chunkIndex, uint32 rowCou
|
||||||
|
|
||||||
StringInfo serializedValueBuffer = chunkData->valueBufferArray[columnIndex];
|
StringInfo serializedValueBuffer = chunkData->valueBufferArray[columnIndex];
|
||||||
|
|
||||||
/* the only other supported compression type is pg_lz for now */
|
Assert(requestedCompressionType >= 0 &&
|
||||||
Assert(requestedCompressionType == COMPRESSION_NONE ||
|
requestedCompressionType < COMPRESSION_COUNT);
|
||||||
requestedCompressionType == COMPRESSION_PG_LZ);
|
|
||||||
|
chunkBuffers->decompressedValueSize =
|
||||||
|
chunkData->valueBufferArray[columnIndex]->len;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if serializedValueBuffer is be compressed, update serializedValueBuffer
|
* if serializedValueBuffer is be compressed, update serializedValueBuffer
|
||||||
|
@ -644,7 +647,7 @@ SerializeChunkData(TableWriteState *writeState, uint32 chunkIndex, uint32 rowCou
|
||||||
if (compressed)
|
if (compressed)
|
||||||
{
|
{
|
||||||
serializedValueBuffer = compressionBuffer;
|
serializedValueBuffer = compressionBuffer;
|
||||||
actualCompressionType = COMPRESSION_PG_LZ;
|
actualCompressionType = requestedCompressionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* store (compressed) value buffer */
|
/* store (compressed) value buffer */
|
||||||
|
|
|
@ -41,6 +41,7 @@ CREATE TABLE cstore_skipnodes (
|
||||||
exists_stream_offset bigint NOT NULL,
|
exists_stream_offset bigint NOT NULL,
|
||||||
exists_stream_length bigint NOT NULL,
|
exists_stream_length bigint NOT NULL,
|
||||||
value_compression_type int NOT NULL,
|
value_compression_type int NOT NULL,
|
||||||
|
value_decompressed_length bigint NOT NULL,
|
||||||
PRIMARY KEY (storageid, stripe, attr, chunk),
|
PRIMARY KEY (storageid, stripe, attr, chunk),
|
||||||
FOREIGN KEY (storageid, stripe) REFERENCES cstore_stripes(storageid, stripe) ON DELETE CASCADE
|
FOREIGN KEY (storageid, stripe) REFERENCES cstore_stripes(storageid, stripe) ON DELETE CASCADE
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
|
@ -117,6 +117,12 @@ typedef struct ColumnChunkSkipNode
|
||||||
uint64 existsChunkOffset;
|
uint64 existsChunkOffset;
|
||||||
uint64 existsLength;
|
uint64 existsLength;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is used for (1) determining destination size when decompressing,
|
||||||
|
* (2) calculating compression rates when logging stats.
|
||||||
|
*/
|
||||||
|
uint64 decompressedValueSize;
|
||||||
|
|
||||||
CompressionType valueCompressionType;
|
CompressionType valueCompressionType;
|
||||||
} ColumnChunkSkipNode;
|
} ColumnChunkSkipNode;
|
||||||
|
|
||||||
|
@ -170,6 +176,7 @@ typedef struct ColumnChunkBuffers
|
||||||
StringInfo existsBuffer;
|
StringInfo existsBuffer;
|
||||||
StringInfo valueBuffer;
|
StringInfo valueBuffer;
|
||||||
CompressionType valueCompressionType;
|
CompressionType valueCompressionType;
|
||||||
|
uint64 decompressedValueSize;
|
||||||
} ColumnChunkBuffers;
|
} ColumnChunkBuffers;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,7 @@ vacuum verbose t_compressed;
|
||||||
INFO: statistics for "t_compressed":
|
INFO: statistics for "t_compressed":
|
||||||
storage id: -1
|
storage id: -1
|
||||||
total file size: 0, total data size: 0
|
total file size: 0, total data size: 0
|
||||||
|
compression rate: 1.00x
|
||||||
total row count: 0, stripe count: 0, average rows per stripe: 0
|
total row count: 0, stripe count: 0, average rows per stripe: 0
|
||||||
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
||||||
|
|
||||||
|
@ -77,6 +78,7 @@ vacuum verbose t_uncompressed;
|
||||||
INFO: statistics for "t_uncompressed":
|
INFO: statistics for "t_uncompressed":
|
||||||
storage id: -1
|
storage id: -1
|
||||||
total file size: 0, total data size: 0
|
total file size: 0, total data size: 0
|
||||||
|
compression rate: 1.00x
|
||||||
total row count: 0, stripe count: 0, average rows per stripe: 0
|
total row count: 0, stripe count: 0, average rows per stripe: 0
|
||||||
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
||||||
|
|
||||||
|
|
|
@ -150,6 +150,7 @@ VACUUM VERBOSE t;
|
||||||
INFO: statistics for "t":
|
INFO: statistics for "t":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 122880, total data size: 10754
|
total file size: 122880, total data size: 10754
|
||||||
|
compression rate: 1.00x
|
||||||
total row count: 2530, stripe count: 3, average rows per stripe: 843
|
total row count: 2530, stripe count: 3, average rows per stripe: 843
|
||||||
chunk count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
|
chunk count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
|
||||||
|
|
||||||
|
@ -197,6 +198,7 @@ VACUUM VERBOSE t;
|
||||||
INFO: statistics for "t":
|
INFO: statistics for "t":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 57344, total data size: 18808
|
total file size: 57344, total data size: 18808
|
||||||
|
compression rate: 1.25x
|
||||||
total row count: 5530, stripe count: 5, average rows per stripe: 1106
|
total row count: 5530, stripe count: 5, average rows per stripe: 1106
|
||||||
chunk count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
|
chunk count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
|
||||||
|
|
||||||
|
@ -214,6 +216,7 @@ VACUUM VERBOSE t;
|
||||||
INFO: statistics for "t":
|
INFO: statistics for "t":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 73728, total data size: 31372
|
total file size: 73728, total data size: 31372
|
||||||
|
compression rate: 1.15x
|
||||||
total row count: 7030, stripe count: 6, average rows per stripe: 1171
|
total row count: 7030, stripe count: 6, average rows per stripe: 1171
|
||||||
chunk count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2
|
chunk count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2
|
||||||
|
|
||||||
|
@ -231,6 +234,7 @@ VACUUM VERBOSE t;
|
||||||
INFO: statistics for "t":
|
INFO: statistics for "t":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 57344, total data size: 15728
|
total file size: 57344, total data size: 15728
|
||||||
|
compression rate: 1.96x
|
||||||
total row count: 7030, stripe count: 4, average rows per stripe: 1757
|
total row count: 7030, stripe count: 4, average rows per stripe: 1757
|
||||||
chunk count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6
|
chunk count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6
|
||||||
|
|
||||||
|
@ -243,3 +247,18 @@ SELECT count(distinct storageid) - :columnar_table_count FROM cstore.cstore_stri
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- A table with high compression ratio
|
||||||
|
SET cstore.compression TO 'pglz';
|
||||||
|
SET cstore.stripe_row_count TO 1000000;
|
||||||
|
SET cstore.chunk_row_count TO 100000;
|
||||||
|
CREATE TABLE t(a int, b char, c text) USING columnar;
|
||||||
|
INSERT INTO t SELECT 1, 'a', 'xyz' FROM generate_series(1, 1000000) i;
|
||||||
|
VACUUM VERBOSE t;
|
||||||
|
INFO: statistics for "t":
|
||||||
|
storage id: xxxxx
|
||||||
|
total file size: 630784, total data size: 604480
|
||||||
|
compression rate: 33.71x
|
||||||
|
total row count: 1000000, stripe count: 1, average rows per stripe: 1000000
|
||||||
|
chunk count: 30, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 30
|
||||||
|
|
||||||
|
DROP TABLE t;
|
||||||
|
|
|
@ -13,6 +13,7 @@ step s1-insert:
|
||||||
s2: INFO: statistics for "test_vacuum_vs_insert":
|
s2: INFO: statistics for "test_vacuum_vs_insert":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 24576, total data size: 26
|
total file size: 24576, total data size: 26
|
||||||
|
compression rate: 1.00x
|
||||||
total row count: 3, stripe count: 1, average rows per stripe: 3
|
total row count: 3, stripe count: 1, average rows per stripe: 3
|
||||||
chunk count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
chunk count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
||||||
|
|
||||||
|
|
|
@ -111,6 +111,7 @@ VACUUM VERBOSE test_options_1;
|
||||||
INFO: statistics for "test_options_1":
|
INFO: statistics for "test_options_1":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 65536, total data size: 43136
|
total file size: 65536, total data size: 43136
|
||||||
|
compression rate: 1.91x
|
||||||
total row count: 10000, stripe count: 2, average rows per stripe: 5000
|
total row count: 10000, stripe count: 2, average rows per stripe: 5000
|
||||||
chunk count: 20, containing data for dropped columns: 0, none compressed: 10, pglz compressed: 10
|
chunk count: 20, containing data for dropped columns: 0, none compressed: 10, pglz compressed: 10
|
||||||
|
|
||||||
|
@ -130,6 +131,7 @@ VACUUM VERBOSE test_options_2;
|
||||||
INFO: statistics for "test_options_2":
|
INFO: statistics for "test_options_2":
|
||||||
storage id: xxxxx
|
storage id: xxxxx
|
||||||
total file size: 163840, total data size: 125636
|
total file size: 163840, total data size: 125636
|
||||||
|
compression rate: 1.31x
|
||||||
total row count: 20000, stripe count: 4, average rows per stripe: 5000
|
total row count: 20000, stripe count: 4, average rows per stripe: 5000
|
||||||
chunk count: 30, containing data for dropped columns: 0, none compressed: 20, pglz compressed: 10
|
chunk count: 30, containing data for dropped columns: 0, none compressed: 20, pglz compressed: 10
|
||||||
|
|
||||||
|
|
|
@ -111,3 +111,14 @@ DROP VIEW t_stripes;
|
||||||
|
|
||||||
-- Make sure we cleaned the metadata for t too
|
-- Make sure we cleaned the metadata for t too
|
||||||
SELECT count(distinct storageid) - :columnar_table_count FROM cstore.cstore_stripes;
|
SELECT count(distinct storageid) - :columnar_table_count FROM cstore.cstore_stripes;
|
||||||
|
|
||||||
|
-- A table with high compression ratio
|
||||||
|
SET cstore.compression TO 'pglz';
|
||||||
|
SET cstore.stripe_row_count TO 1000000;
|
||||||
|
SET cstore.chunk_row_count TO 100000;
|
||||||
|
CREATE TABLE t(a int, b char, c text) USING columnar;
|
||||||
|
INSERT INTO t SELECT 1, 'a', 'xyz' FROM generate_series(1, 1000000) i;
|
||||||
|
|
||||||
|
VACUUM VERBOSE t;
|
||||||
|
|
||||||
|
DROP TABLE t;
|
||||||
|
|
Loading…
Reference in New Issue