mirror of https://github.com/citusdata/citus.git
Account for worst compress sizes before allocating a new chunk
- When compression is enable, in case for a worst compression input_data_size < compressed_data_size. This will increease the data length and again will cause enlargeStringInfo() failures. - We should also account for this change before allocation/deciding a new chunk group. GetMaxCompressedLength() will help us calculating the expected worst compressed sizes before hand.pull/8202/head
parent
b0dcc11501
commit
292247e2cc
|
|
@ -270,3 +270,45 @@ DecompressBuffer(StringInfo buffer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return worst-case compressed size for the given input size and
|
||||||
|
* compression type. For unsupported types, return the input size.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
GetMaxCompressedLength(int size, CompressionType compressionType)
|
||||||
|
{
|
||||||
|
Assert(compressionType >= 0 && compressionType < COMPRESSION_COUNT);
|
||||||
|
|
||||||
|
switch (compressionType)
|
||||||
|
{
|
||||||
|
case COMPRESSION_NONE:
|
||||||
|
{
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
#if HAVE_CITUS_LIBLZ4
|
||||||
|
case COMPRESSION_LZ4:
|
||||||
|
{
|
||||||
|
return LZ4_compressBound(size);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if HAVE_LIBZSTD
|
||||||
|
case COMPRESSION_ZSTD:
|
||||||
|
{
|
||||||
|
return (int) ZSTD_compressBound(size);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
case COMPRESSION_PG_LZ:
|
||||||
|
{
|
||||||
|
return (int) (PGLZ_MAX_OUTPUT(size) + COLUMNAR_COMPRESS_HDRSZ);
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -184,6 +184,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
||||||
const uint32 maxChunkCount = (options->stripeRowCount / chunkRowCount) + 1;
|
const uint32 maxChunkCount = (options->stripeRowCount / chunkRowCount) + 1;
|
||||||
ChunkData *chunkData = writeState->chunkData;
|
ChunkData *chunkData = writeState->chunkData;
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||||
|
bool shouldSerializeEarly = false;
|
||||||
|
|
||||||
if (stripeBuffers == NULL)
|
if (stripeBuffers == NULL)
|
||||||
{
|
{
|
||||||
|
|
@ -255,16 +256,28 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if we need to serialize a chunk group earliar due to size limits.
|
* Check if we need to serialize a chunk group earliar due to size limits.
|
||||||
|
* We also need to account to worst case copressed data size that can
|
||||||
|
* also exceed the limits.
|
||||||
|
*/
|
||||||
|
if (chunkRowIndex > 0)
|
||||||
|
{
|
||||||
|
int64 chunkGroupLimit = CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize);
|
||||||
|
int64 maxCompressedSize = GetMaxCompressedLength(writeState->currentChunkBytes,
|
||||||
|
writeState->options.compressionType);
|
||||||
|
|
||||||
|
shouldSerializeEarly = (maxCompressedSize + totalRowSize > chunkGroupLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
* If adding the current row spills out from the defined chunk grupu size limit, we
|
* If adding the current row spills out from the defined chunk grupu size limit, we
|
||||||
* will then add the current row in a seperate chunk and will serialize
|
* will then add the current row in a seperate chunk and will serialize
|
||||||
* all rows data before it.
|
* all rows data before it.
|
||||||
*/
|
*/
|
||||||
if (chunkRowIndex > 0 &&
|
if (shouldSerializeEarly)
|
||||||
writeState->currentChunkBytes + totalRowSize > CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize))
|
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Row size (%zu bytes) exceeds chunk group size limit (%zu bytes), "
|
elog(DEBUG1, "Row size (%zu bytes) exceeds chunk group size limit (%zu bytes), "
|
||||||
"storing in a separate chunk group",
|
"storing in a separate chunk group",
|
||||||
totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize));
|
totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Before putting row in a seperate chunk we have to allocate space
|
* Before putting row in a seperate chunk we have to allocate space
|
||||||
|
|
|
||||||
|
|
@ -30,5 +30,5 @@ extern bool CompressBuffer(StringInfo inputBuffer,
|
||||||
int compressionLevel);
|
int compressionLevel);
|
||||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType,
|
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType,
|
||||||
uint64 decompressedSize);
|
uint64 decompressedSize);
|
||||||
|
extern int GetMaxCompressedLength(int size, CompressionType compressionType);
|
||||||
#endif /* COLUMNAR_COMPRESSION_H */
|
#endif /* COLUMNAR_COMPRESSION_H */
|
||||||
|
|
|
||||||
|
|
@ -36,61 +36,41 @@ DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes
|
||||||
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
DEBUG: Flushing Stripe of size 600
|
DEBUG: Flushing Stripe of size 600
|
||||||
|
-- test VACUUM FULL
|
||||||
|
VACUUM FULL test_oversized_row;
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Flushing Stripe of size 600
|
||||||
|
SET client_min_messages TO warning;
|
||||||
|
-- try verifying the data integrity
|
||||||
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
||||||
relation | storage_id | stripe_num | chunk_group_num | row_count
|
relation | storage_id | stripe_num | chunk_group_num | row_count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
test_oversized_row | 10000000261 | 1 | 0 | 63
|
test_oversized_row | 10000000262 | 1 | 0 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 1 | 63
|
test_oversized_row | 10000000262 | 1 | 1 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 2 | 63
|
test_oversized_row | 10000000262 | 1 | 2 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 3 | 63
|
test_oversized_row | 10000000262 | 1 | 3 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 4 | 63
|
test_oversized_row | 10000000262 | 1 | 4 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 5 | 63
|
test_oversized_row | 10000000262 | 1 | 5 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 6 | 63
|
test_oversized_row | 10000000262 | 1 | 6 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 7 | 63
|
test_oversized_row | 10000000262 | 1 | 7 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 8 | 63
|
test_oversized_row | 10000000262 | 1 | 8 | 63
|
||||||
test_oversized_row | 10000000261 | 1 | 9 | 33
|
test_oversized_row | 10000000262 | 1 | 9 | 33
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
||||||
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
|
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
test_oversized_row | 10000000261 | 1 | 16336 | 1258296154 | 2 | 1000 | 600 | 10 | 1
|
test_oversized_row | 10000000262 | 1 | 16336 | 1258296154 | 2 | 1000 | 600 | 10 | 1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test edge case setting chunk_group_size_limit = 1024
|
|
||||||
DROP TABLE test_oversized_row;
|
|
||||||
CREATE TABLE test_oversized_row (
|
|
||||||
id INTEGER,
|
|
||||||
huge_text TEXT
|
|
||||||
) USING columnar WITH (
|
|
||||||
columnar.chunk_group_row_limit = 1000,
|
|
||||||
columnar.stripe_row_limit = 5000,
|
|
||||||
columnar.chunk_group_size_limit = 1024
|
|
||||||
);
|
|
||||||
INSERT INTO test_oversized_row
|
|
||||||
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
|
||||||
FROM generate_series(1, 600) AS gs;
|
|
||||||
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
|
|
||||||
DEBUG: Flushing Stripe of size 600
|
|
||||||
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
|
||||||
relation | storage_id | stripe_num | chunk_group_num | row_count
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
test_oversized_row | 10000000262 | 1 | 0 | 511
|
|
||||||
test_oversized_row | 10000000262 | 1 | 1 | 89
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
|
||||||
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
test_oversized_row | 10000000262 | 1 | 16336 | 1258296152 | 2 | 1000 | 600 | 2 | 1
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- test VACUUM FULL
|
|
||||||
VACUUM FULL test_oversized_row;
|
|
||||||
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
|
|
||||||
DEBUG: Flushing Stripe of size 600
|
|
||||||
SET client_min_messages TO warning;
|
|
||||||
-- try verifying the data integrity
|
|
||||||
SELECT COUNT(*) FROM test_oversized_row;
|
SELECT COUNT(*) FROM test_oversized_row;
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -112,11 +92,10 @@ SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
|
||||||
10 | 2097152
|
10 | 2097152
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
-- total size should be greater 1GB (1258291200 bytes)
|
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
|
||||||
SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row;
|
is_equal
|
||||||
total_size
|
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1258291200
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\dt+ test_oversized_row
|
\dt+ test_oversized_row
|
||||||
|
|
@ -126,5 +105,71 @@ SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row;
|
||||||
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1204 MB |
|
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1204 MB |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- test edge case setting chunk_group_size_limit = 1024
|
||||||
|
DROP TABLE test_oversized_row;
|
||||||
|
SET client_min_messages TO debug1;
|
||||||
|
SET columnar.compression TO default;
|
||||||
|
CREATE TABLE test_oversized_row (
|
||||||
|
id INTEGER,
|
||||||
|
huge_text TEXT
|
||||||
|
) USING columnar WITH (
|
||||||
|
columnar.chunk_group_row_limit = 1000,
|
||||||
|
columnar.stripe_row_limit = 5000,
|
||||||
|
columnar.chunk_group_size_limit = 1024
|
||||||
|
);
|
||||||
|
INSERT INTO test_oversized_row
|
||||||
|
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
||||||
|
FROM generate_series(1, 600) AS gs;
|
||||||
|
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
|
||||||
|
DEBUG: Flushing Stripe of size 600
|
||||||
|
SET client_min_messages TO warning;
|
||||||
|
-- try verifying the data integrity
|
||||||
|
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
||||||
|
relation | storage_id | stripe_num | chunk_group_num | row_count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_oversized_row | 10000000263 | 1 | 0 | 510
|
||||||
|
test_oversized_row | 10000000263 | 1 | 1 | 90
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
||||||
|
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_oversized_row | 10000000263 | 1 | 16336 | 49278 | 2 | 1000 | 600 | 2 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM test_oversized_row;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
600
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
|
||||||
|
id | length
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2097152
|
||||||
|
2 | 2097152
|
||||||
|
3 | 2097152
|
||||||
|
4 | 2097152
|
||||||
|
5 | 2097152
|
||||||
|
6 | 2097152
|
||||||
|
7 | 2097152
|
||||||
|
8 | 2097152
|
||||||
|
9 | 2097152
|
||||||
|
10 | 2097152
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
|
||||||
|
is_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\dt+ test_oversized_row
|
||||||
|
List of relations
|
||||||
|
Schema | Name | Type | Owner | Persistence | Size | Description
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 72 kB |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE test_oversized_row;
|
DROP TABLE test_oversized_row;
|
||||||
DROP SCHEMA columnar_chunk_test CASCADE;
|
DROP SCHEMA columnar_chunk_test CASCADE;
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,24 @@ INSERT INTO test_oversized_row
|
||||||
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
||||||
FROM generate_series(1, 600) AS gs;
|
FROM generate_series(1, 600) AS gs;
|
||||||
|
|
||||||
|
-- test VACUUM FULL
|
||||||
|
VACUUM FULL test_oversized_row;
|
||||||
|
|
||||||
|
SET client_min_messages TO warning;
|
||||||
|
|
||||||
|
-- try verifying the data integrity
|
||||||
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
||||||
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
||||||
|
SELECT COUNT(*) FROM test_oversized_row;
|
||||||
|
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
|
||||||
|
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
|
||||||
|
\dt+ test_oversized_row
|
||||||
|
|
||||||
|
|
||||||
-- test edge case setting chunk_group_size_limit = 1024
|
-- test edge case setting chunk_group_size_limit = 1024
|
||||||
DROP TABLE test_oversized_row;
|
DROP TABLE test_oversized_row;
|
||||||
|
SET client_min_messages TO debug1;
|
||||||
|
SET columnar.compression TO default;
|
||||||
|
|
||||||
CREATE TABLE test_oversized_row (
|
CREATE TABLE test_oversized_row (
|
||||||
id INTEGER,
|
id INTEGER,
|
||||||
|
|
@ -52,21 +65,14 @@ INSERT INTO test_oversized_row
|
||||||
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
|
||||||
FROM generate_series(1, 600) AS gs;
|
FROM generate_series(1, 600) AS gs;
|
||||||
|
|
||||||
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
|
||||||
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
|
||||||
|
|
||||||
-- test VACUUM FULL
|
|
||||||
VACUUM FULL test_oversized_row;
|
|
||||||
|
|
||||||
SET client_min_messages TO warning;
|
SET client_min_messages TO warning;
|
||||||
|
|
||||||
-- try verifying the data integrity
|
-- try verifying the data integrity
|
||||||
|
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
|
||||||
|
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
|
||||||
SELECT COUNT(*) FROM test_oversized_row;
|
SELECT COUNT(*) FROM test_oversized_row;
|
||||||
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
|
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
|
||||||
|
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
|
||||||
-- total size should be greater 1GB (1258291200 bytes)
|
|
||||||
SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row;
|
|
||||||
|
|
||||||
\dt+ test_oversized_row
|
\dt+ test_oversized_row
|
||||||
|
|
||||||
DROP TABLE test_oversized_row;
|
DROP TABLE test_oversized_row;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue