fix: Keep track of chunk indices instead recomputing & adjustments

There were still some issues in adjujsting chunk index and chunk row index
after every compute. This was causing failures for some edge cases i.e. reaching
chunk row limit.

A better way would be keep track of these indices intead of recomputing and adjujsting
them on every row iteration.
pull/8202/head
Imran Zaheer 2025-09-25 10:28:06 +05:00
parent 292247e2cc
commit 9f52f6a9a1
3 changed files with 79 additions and 105 deletions

View File

@ -55,14 +55,14 @@ struct ColumnarWriteState
EmptyStripeReservation *emptyStripeReservation; EmptyStripeReservation *emptyStripeReservation;
ColumnarOptions options; ColumnarOptions options;
ChunkData *chunkData; ChunkData *chunkData;
uint32 currentChunkRowIndex;
uint32 currentChunkIndex;
/* /*
* accounting for creating new chunks groups when * accounting for creating new chunks groups when
* size limit reaches * size limit reaches
*/ */
Size currentChunkBytes; Size currentChunkBytes;
uint32 earlySerializedRowCount;
uint32 earlySerializedChunkCount;
List *chunkGroupRowCounts; List *chunkGroupRowCounts;
@ -216,23 +216,16 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
} }
writeState->currentChunkBytes = 0; writeState->currentChunkBytes = 0;
writeState->earlySerializedRowCount = 0; writeState->currentChunkIndex = 0;
writeState->earlySerializedChunkCount = 0; writeState->currentChunkRowIndex = 0;
/* Ensure maxChunkSize is set with a reasonable default */ /* Ensure maxChunkSize is set with a reasonable default */
Assert(options->maxChunkSize >= CHUNK_GROUP_SIZE_MINIMUM && Assert(options->maxChunkSize >= CHUNK_GROUP_SIZE_MINIMUM &&
options->maxChunkSize <= CHUNK_GROUP_SIZE_MAXIMUM); options->maxChunkSize <= CHUNK_GROUP_SIZE_MAXIMUM);
} }
uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount; uint32 chunkIndex = writeState->currentChunkIndex;
uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; uint32 chunkRowIndex = writeState->currentChunkRowIndex;
/* Adjust the indices if some chunks were serialized early */
if (writeState->earlySerializedRowCount)
{
chunkIndex = chunkIndex + writeState->earlySerializedChunkCount;
chunkRowIndex = chunkRowIndex - writeState->earlySerializedRowCount;
}
/* /*
* Calculate total serialized current row size without actually serializing. * Calculate total serialized current row size without actually serializing.
@ -293,19 +286,14 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
* Size limit reached, now serialize upto the last row. * Size limit reached, now serialize upto the last row.
* We make sure not to serialize the current row data and only upto * We make sure not to serialize the current row data and only upto
* the last row, so we use `chunkRowIndex` instead of `chunkRowIndex + 1` * the last row, so we use `chunkRowIndex` instead of `chunkRowIndex + 1`
* in order to skip current row. * in order to skip current row. Current row will go in the next chunk.
*/ */
SerializeChunkData(writeState, chunkIndex, chunkRowIndex); SerializeChunkData(writeState, chunkIndex, chunkRowIndex);
writeState->earlySerializedChunkCount++;
writeState->currentChunkBytes = 0; writeState->currentChunkBytes = 0;
writeState->earlySerializedRowCount = writeState->earlySerializedRowCount + chunkRowIndex;
/* Recreate and adjust the indices after deciding to start a new chunk */ /* Adjust the indices after deciding to start a new chunk */
chunkIndex = stripeBuffers->rowCount / chunkRowCount; chunkIndex = ++writeState->currentChunkIndex;
chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; chunkRowIndex = writeState->currentChunkRowIndex = 0;
chunkIndex = chunkIndex + writeState->earlySerializedChunkCount;
chunkRowIndex = chunkRowIndex - writeState->earlySerializedRowCount;
} }
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
@ -351,11 +339,21 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
{ {
SerializeChunkData(writeState, chunkIndex, chunkRowCount); SerializeChunkData(writeState, chunkIndex, chunkRowCount);
writeState->currentChunkBytes = 0; writeState->currentChunkBytes = 0;
writeState->currentChunkIndex++;
writeState->currentChunkRowIndex = 0;
} }
uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber + uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber +
stripeBuffers->rowCount; stripeBuffers->rowCount;
stripeBuffers->rowCount++; stripeBuffers->rowCount++;
/*
* don't increment when chunk row limit was reached and new chunk was
* created, writeState->currentChunkRowIndex should suppose to be remain `0`
* in this case.
*/
if (chunkRowIndex != chunkRowCount - 1) writeState->currentChunkRowIndex++;
if (stripeBuffers->rowCount >= options->stripeRowCount) if (stripeBuffers->rowCount >= options->stripeRowCount)
{ {
ColumnarFlushPendingWrites(writeState); ColumnarFlushPendingWrites(writeState);
@ -580,22 +578,11 @@ FlushStripe(ColumnarWriteState *writeState)
TupleDesc tupleDescriptor = writeState->tupleDescriptor; TupleDesc tupleDescriptor = writeState->tupleDescriptor;
uint32 columnCount = tupleDescriptor->natts; uint32 columnCount = tupleDescriptor->natts;
uint32 chunkCount = stripeSkipList->chunkCount; uint32 chunkCount = stripeSkipList->chunkCount;
uint32 chunkRowCount = writeState->options.chunkRowCount; uint32 lastChunkIndex = writeState->currentChunkIndex;
uint32 lastChunkIndex = stripeBuffers->rowCount / chunkRowCount; uint32 lastChunkRowCount = writeState->currentChunkRowIndex;
uint32 lastChunkRowCount = stripeBuffers->rowCount % chunkRowCount;
uint64 stripeSize = 0; uint64 stripeSize = 0;
uint64 stripeRowCount = stripeBuffers->rowCount; uint64 stripeRowCount = stripeBuffers->rowCount;
if (writeState->earlySerializedRowCount)
{
/*
* Increment indices as a chunk has beed serialized early
* because of reaching its size limit
*/
lastChunkIndex = lastChunkIndex + writeState->earlySerializedChunkCount;
lastChunkRowCount = lastChunkRowCount - writeState->earlySerializedRowCount;
}
elog(DEBUG1, "Flushing Stripe of size %d", stripeBuffers->rowCount); elog(DEBUG1, "Flushing Stripe of size %d", stripeBuffers->rowCount);
Oid relationId = RelidByRelfilenumber(RelationTablespace_compat( Oid relationId = RelidByRelfilenumber(RelationTablespace_compat(

View File

@ -20,89 +20,79 @@ CREATE TABLE test_oversized_row (
huge_text TEXT huge_text TEXT
) USING columnar WITH ( ) USING columnar WITH (
columnar.chunk_group_row_limit = 1000, columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 5000, columnar.stripe_row_limit = 1500,
columnar.chunk_group_size_limit = 128 columnar.chunk_group_size_limit = 128
); );
-- test with chunk & stripe row limit reached
INSERT INTO test_oversized_row INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text
FROM generate_series(1, 600) AS gs; FROM generate_series(1, 1600) AS gs;
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600 DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
-- test VACUUM FULL DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
VACUUM FULL test_oversized_row; DEBUG: Flushing Stripe of size 1500
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group DEBUG: Flushing Stripe of size 100
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600
SET client_min_messages TO warning; SET client_min_messages TO warning;
-- try verifying the data integrity -- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | chunk_group_num | row_count relation | storage_id | stripe_num | chunk_group_num | row_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
test_oversized_row | 10000000262 | 1 | 0 | 63 test_oversized_row | 10000000261 | 1 | 0 | 127
test_oversized_row | 10000000262 | 1 | 1 | 63 test_oversized_row | 10000000261 | 1 | 1 | 127
test_oversized_row | 10000000262 | 1 | 2 | 63 test_oversized_row | 10000000261 | 1 | 2 | 127
test_oversized_row | 10000000262 | 1 | 3 | 63 test_oversized_row | 10000000261 | 1 | 3 | 127
test_oversized_row | 10000000262 | 1 | 4 | 63 test_oversized_row | 10000000261 | 1 | 4 | 127
test_oversized_row | 10000000262 | 1 | 5 | 63 test_oversized_row | 10000000261 | 1 | 5 | 127
test_oversized_row | 10000000262 | 1 | 6 | 63 test_oversized_row | 10000000261 | 1 | 6 | 127
test_oversized_row | 10000000262 | 1 | 7 | 63 test_oversized_row | 10000000261 | 1 | 7 | 127
test_oversized_row | 10000000262 | 1 | 8 | 63 test_oversized_row | 10000000261 | 1 | 8 | 127
test_oversized_row | 10000000262 | 1 | 9 | 33 test_oversized_row | 10000000261 | 1 | 9 | 127
(10 rows) test_oversized_row | 10000000261 | 1 | 10 | 127
test_oversized_row | 10000000261 | 1 | 11 | 103
test_oversized_row | 10000000261 | 2 | 0 | 100
(13 rows)
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
--------------------------------------------------------------------- ---------------------------------------------------------------------
test_oversized_row | 10000000262 | 1 | 16336 | 1258296154 | 2 | 1000 | 600 | 10 | 1 test_oversized_row | 10000000261 | 1 | 16336 | 1572876378 | 2 | 1000 | 1500 | 12 | 1
(1 row) test_oversized_row | 10000000261 | 2 | 1572895424 | 104858426 | 2 | 1000 | 100 | 1 | 1501
(2 rows)
SELECT COUNT(*) FROM test_oversized_row; SELECT COUNT(*) FROM test_oversized_row;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
600 1600
(1 row) (1 row)
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
id | length id | length
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 2097152 1 | 1048576
2 | 2097152 2 | 1048576
3 | 2097152 3 | 1048576
4 | 2097152 4 | 1048576
5 | 2097152 5 | 1048576
6 | 2097152 6 | 1048576
7 | 2097152 7 | 1048576
8 | 2097152 8 | 1048576
9 | 2097152 9 | 1048576
10 | 2097152 10 | 1048576
(10 rows) (10 rows)
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
is_equal
---------------------------------------------------------------------
t
(1 row)
\dt+ test_oversized_row \dt+ test_oversized_row
List of relations List of relations
Schema | Name | Type | Owner | Persistence | Size | Description Schema | Name | Type | Owner | Persistence | Size | Description
--------------------------------------------------------------------- ---------------------------------------------------------------------
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1204 MB | columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1605 MB |
(1 row) (1 row)
-- test edge case setting chunk_group_size_limit = 1024 -- test edge case setting chunk_group_size_limit = 1024
@ -122,6 +112,10 @@ SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
FROM generate_series(1, 600) AS gs; FROM generate_series(1, 600) AS gs;
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600 DEBUG: Flushing Stripe of size 600
-- test VACUUM FULL
VACUUM FULL test_oversized_row;
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600
SET client_min_messages TO warning; SET client_min_messages TO warning;
-- try verifying the data integrity -- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
@ -158,12 +152,6 @@ SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
10 | 2097152 10 | 2097152
(10 rows) (10 rows)
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
is_equal
---------------------------------------------------------------------
t
(1 row)
\dt+ test_oversized_row \dt+ test_oversized_row
List of relations List of relations
Schema | Name | Type | Owner | Persistence | Size | Description Schema | Name | Type | Owner | Persistence | Size | Description

View File

@ -25,16 +25,14 @@ CREATE TABLE test_oversized_row (
huge_text TEXT huge_text TEXT
) USING columnar WITH ( ) USING columnar WITH (
columnar.chunk_group_row_limit = 1000, columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 5000, columnar.stripe_row_limit = 1500,
columnar.chunk_group_size_limit = 128 columnar.chunk_group_size_limit = 128
); );
-- test with chunk & stripe row limit reached
INSERT INTO test_oversized_row INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text
FROM generate_series(1, 600) AS gs; FROM generate_series(1, 1600) AS gs;
-- test VACUUM FULL
VACUUM FULL test_oversized_row;
SET client_min_messages TO warning; SET client_min_messages TO warning;
@ -43,7 +41,6 @@ SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regcla
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
SELECT COUNT(*) FROM test_oversized_row; SELECT COUNT(*) FROM test_oversized_row;
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
\dt+ test_oversized_row \dt+ test_oversized_row
@ -65,6 +62,9 @@ INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
FROM generate_series(1, 600) AS gs; FROM generate_series(1, 600) AS gs;
-- test VACUUM FULL
VACUUM FULL test_oversized_row;
SET client_min_messages TO warning; SET client_min_messages TO warning;
-- try verifying the data integrity -- try verifying the data integrity
@ -72,7 +72,6 @@ SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regcla
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
SELECT COUNT(*) FROM test_oversized_row; SELECT COUNT(*) FROM test_oversized_row;
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
SELECT SUM(LENGTH(huge_text)) = 1258291200 AS is_equal FROM test_oversized_row;
\dt+ test_oversized_row \dt+ test_oversized_row
DROP TABLE test_oversized_row; DROP TABLE test_oversized_row;