diff --git a/src/backend/columnar/columnar_compression.c b/src/backend/columnar/columnar_compression.c index 2ff35da98..4f955f6eb 100644 --- a/src/backend/columnar/columnar_compression.c +++ b/src/backend/columnar/columnar_compression.c @@ -270,3 +270,45 @@ DecompressBuffer(StringInfo buffer, } } } + + +/* + * Return worst-case compressed size for the given input size and + * compression type. For unsupported types, return the input size. + */ +int +GetMaxCompressedLength(int size, CompressionType compressionType) +{ + Assert(compressionType >= 0 && compressionType < COMPRESSION_COUNT); + + switch (compressionType) + { + case COMPRESSION_NONE: + { + return size; + } +#if HAVE_CITUS_LIBLZ4 + case COMPRESSION_LZ4: + { + return LZ4_compressBound(size); + } +#endif + +#if HAVE_LIBZSTD + case COMPRESSION_ZSTD: + { + return (int) ZSTD_compressBound(size); + } +#endif + + case COMPRESSION_PG_LZ: + { + return (int) (PGLZ_MAX_OUTPUT(size) + COLUMNAR_COMPRESS_HDRSZ); + } + + default: + { + ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType))); + } + } +} \ No newline at end of file diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index 3158c982d..fbdbc1d5e 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -184,6 +184,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu const uint32 maxChunkCount = (options->stripeRowCount / chunkRowCount) + 1; ChunkData *chunkData = writeState->chunkData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); + bool shouldSerializeEarly = false; if (stripeBuffers == NULL) { @@ -255,16 +256,28 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu /* * Check if we need to serialize a chunk group earliar due to size limits. + * We also need to account to worst case copressed data size that can + * also exceed the limits. + */ + if (chunkRowIndex > 0) + { + int64 chunkGroupLimit = CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize); + int64 maxCompressedSize = GetMaxCompressedLength(writeState->currentChunkBytes, + writeState->options.compressionType); + + shouldSerializeEarly = (maxCompressedSize + totalRowSize > chunkGroupLimit); + } + + /* * If adding the current row spills out from the defined chunk grupu size limit, we * will then add the current row in a seperate chunk and will serialize * all rows data before it. */ - if (chunkRowIndex > 0 && - writeState->currentChunkBytes + totalRowSize > CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize)) + if (shouldSerializeEarly) { elog(DEBUG1, "Row size (%zu bytes) exceeds chunk group size limit (%zu bytes), " - "storing in a separate chunk group", - totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize)); + "storing in a separate chunk group", + totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize)); /* * Before putting row in a seperate chunk we have to allocate space diff --git a/src/include/columnar/columnar_compression.h b/src/include/columnar/columnar_compression.h index 5b9710f20..d93d3fd97 100644 --- a/src/include/columnar/columnar_compression.h +++ b/src/include/columnar/columnar_compression.h @@ -30,5 +30,5 @@ extern bool CompressBuffer(StringInfo inputBuffer, int compressionLevel); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType, uint64 decompressedSize); - +extern int GetMaxCompressedLength(int size, CompressionType compressionType); #endif /* COLUMNAR_COMPRESSION_H */ diff --git a/src/test/regress/expected/columnar_chunk_sizes.out b/src/test/regress/expected/columnar_chunk_sizes.out index 41bd82f2b..7a18b898c 100644 --- a/src/test/regress/expected/columnar_chunk_sizes.out +++ b/src/test/regress/expected/columnar_chunk_sizes.out @@ -36,61 +36,41 @@ DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Flushing Stripe of size 600 +-- test VACUUM FULL +VACUUM FULL test_oversized_row; +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Flushing Stripe of size 600 +SET client_min_messages TO warning; +-- try verifying the data integrity SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; relation | storage_id | stripe_num | chunk_group_num | row_count --------------------------------------------------------------------- - test_oversized_row | 10000000261 | 1 | 0 | 63 - test_oversized_row | 10000000261 | 1 | 1 | 63 - test_oversized_row | 10000000261 | 1 | 2 | 63 - test_oversized_row | 10000000261 | 1 | 3 | 63 - test_oversized_row | 10000000261 | 1 | 4 | 63 - test_oversized_row | 10000000261 | 1 | 5 | 63 - test_oversized_row | 10000000261 | 1 | 6 | 63 - test_oversized_row | 10000000261 | 1 | 7 | 63 - test_oversized_row | 10000000261 | 1 | 8 | 63 - test_oversized_row | 10000000261 | 1 | 9 | 33 + test_oversized_row | 10000000262 | 1 | 0 | 63 + test_oversized_row | 10000000262 | 1 | 1 | 63 + test_oversized_row | 10000000262 | 1 | 2 | 63 + test_oversized_row | 10000000262 | 1 | 3 | 63 + test_oversized_row | 10000000262 | 1 | 4 | 63 + test_oversized_row | 10000000262 | 1 | 5 | 63 + test_oversized_row | 10000000262 | 1 | 6 | 63 + test_oversized_row | 10000000262 | 1 | 7 | 63 + test_oversized_row | 10000000262 | 1 | 8 | 63 + test_oversized_row | 10000000262 | 1 | 9 | 33 (10 rows) SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number --------------------------------------------------------------------- - test_oversized_row | 10000000261 | 1 | 16336 | 1258296154 | 2 | 1000 | 600 | 10 | 1 + test_oversized_row | 10000000262 | 1 | 16336 | 1258296154 | 2 | 1000 | 600 | 10 | 1 (1 row) --- test edge case setting chunk_group_size_limit = 1024 -DROP TABLE test_oversized_row; -CREATE TABLE test_oversized_row ( - id INTEGER, - huge_text TEXT -) USING columnar WITH ( - columnar.chunk_group_row_limit = 1000, - columnar.stripe_row_limit = 5000, - columnar.chunk_group_size_limit = 1024 -); -INSERT INTO test_oversized_row -SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text -FROM generate_series(1, 600) AS gs; -DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group -DEBUG: Flushing Stripe of size 600 -SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; - relation | storage_id | stripe_num | chunk_group_num | row_count ---------------------------------------------------------------------- - test_oversized_row | 10000000262 | 1 | 0 | 511 - test_oversized_row | 10000000262 | 1 | 1 | 89 -(2 rows) - -SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; - relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number ---------------------------------------------------------------------- - test_oversized_row | 10000000262 | 1 | 16336 | 1258296152 | 2 | 1000 | 600 | 2 | 1 -(1 row) - --- test VACUUM FULL -VACUUM FULL test_oversized_row; -DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group -DEBUG: Flushing Stripe of size 600 -SET client_min_messages TO warning; --- try verifying the data integrity SELECT COUNT(*) FROM test_oversized_row; count --------------------------------------------------------------------- @@ -112,11 +92,10 @@ SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; 10 | 2097152 (10 rows) --- total size should be greater 1GB (1258291200 bytes) -SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row; - total_size +SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row; + is_equal --------------------------------------------------------------------- - 1258291200 + t (1 row) \dt+ test_oversized_row @@ -126,5 +105,71 @@ SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row; columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1204 MB | (1 row) +-- test edge case setting chunk_group_size_limit = 1024 +DROP TABLE test_oversized_row; +SET client_min_messages TO debug1; +SET columnar.compression TO default; +CREATE TABLE test_oversized_row ( + id INTEGER, + huge_text TEXT +) USING columnar WITH ( + columnar.chunk_group_row_limit = 1000, + columnar.stripe_row_limit = 5000, + columnar.chunk_group_size_limit = 1024 +); +INSERT INTO test_oversized_row +SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text +FROM generate_series(1, 600) AS gs; +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group +DEBUG: Flushing Stripe of size 600 +SET client_min_messages TO warning; +-- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | chunk_group_num | row_count +--------------------------------------------------------------------- + test_oversized_row | 10000000263 | 1 | 0 | 510 + test_oversized_row | 10000000263 | 1 | 1 | 90 +(2 rows) + +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number +--------------------------------------------------------------------- + test_oversized_row | 10000000263 | 1 | 16336 | 49278 | 2 | 1000 | 600 | 2 | 1 +(1 row) + +SELECT COUNT(*) FROM test_oversized_row; + count +--------------------------------------------------------------------- + 600 +(1 row) + +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; + id | length +--------------------------------------------------------------------- + 1 | 2097152 + 2 | 2097152 + 3 | 2097152 + 4 | 2097152 + 5 | 2097152 + 6 | 2097152 + 7 | 2097152 + 8 | 2097152 + 9 | 2097152 + 10 | 2097152 +(10 rows) + +SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row; + is_equal +--------------------------------------------------------------------- + t +(1 row) + +\dt+ test_oversized_row + List of relations + Schema | Name | Type | Owner | Persistence | Size | Description +--------------------------------------------------------------------- + columnar_chunk_test | test_oversized_row | table | postgres | permanent | 72 kB | +(1 row) + DROP TABLE test_oversized_row; DROP SCHEMA columnar_chunk_test CASCADE; diff --git a/src/test/regress/sql/columnar_chunk_sizes.sql b/src/test/regress/sql/columnar_chunk_sizes.sql index fb74ffbde..cb3821450 100644 --- a/src/test/regress/sql/columnar_chunk_sizes.sql +++ b/src/test/regress/sql/columnar_chunk_sizes.sql @@ -33,11 +33,24 @@ INSERT INTO test_oversized_row SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text FROM generate_series(1, 600) AS gs; +-- test VACUUM FULL +VACUUM FULL test_oversized_row; + +SET client_min_messages TO warning; + +-- try verifying the data integrity SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; +SELECT COUNT(*) FROM test_oversized_row; +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; +SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row; +\dt+ test_oversized_row + -- test edge case setting chunk_group_size_limit = 1024 DROP TABLE test_oversized_row; +SET client_min_messages TO debug1; +SET columnar.compression TO default; CREATE TABLE test_oversized_row ( id INTEGER, @@ -52,21 +65,14 @@ INSERT INTO test_oversized_row SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text FROM generate_series(1, 600) AS gs; -SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; -SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; - --- test VACUUM FULL -VACUUM FULL test_oversized_row; - SET client_min_messages TO warning; -- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; SELECT COUNT(*) FROM test_oversized_row; SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; - --- total size should be greater 1GB (1258291200 bytes) -SELECT SUM(LENGTH(huge_text)) AS total_size FROM test_oversized_row; - +SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row; \dt+ test_oversized_row DROP TABLE test_oversized_row;