diff --git a/src/backend/columnar/README.md b/src/backend/columnar/README.md index b2e0fdf3b..9c4a4db83 100644 --- a/src/backend/columnar/README.md +++ b/src/backend/columnar/README.md @@ -111,6 +111,10 @@ The following options are available: chunk for _newly-inserted_ data. Existing chunks of data will not be changed and may have more rows than this maximum value. The default value is `10000`. +* **columnar.chunk_group_size_limit**: ```` - the maximum size in MB per + chunk group for _newly-inserted_ data. Existing chunks of data will not be + changed and may have more size than this maximum value. The default + value is `256`. View options for all tables with: @@ -125,6 +129,7 @@ following GUCs: * `columnar.compression_level` * `columnar.stripe_row_limit` * `columnar.chunk_group_row_limit` +* `columnar.chunk_group_size_limit` GUCs only affect newly-created *tables*, not any newly-created *stripes* on an existing table. diff --git a/src/backend/columnar/columnar.c b/src/backend/columnar/columnar.c index 4914bbc3a..9250596fb 100644 --- a/src/backend/columnar/columnar.c +++ b/src/backend/columnar/columnar.c @@ -29,6 +29,7 @@ /* Default values for option parameters */ #define DEFAULT_STRIPE_ROW_COUNT 150000 #define DEFAULT_CHUNK_ROW_COUNT 10000 +#define DEFAULT_CHUNK_GROUP_SIZE 256 /* 256 MB */ #if HAVE_LIBZSTD #define DEFAULT_COMPRESSION_TYPE COMPRESSION_ZSTD @@ -41,6 +42,7 @@ int columnar_compression = DEFAULT_COMPRESSION_TYPE; int columnar_stripe_row_limit = DEFAULT_STRIPE_ROW_COUNT; int columnar_chunk_group_row_limit = DEFAULT_CHUNK_ROW_COUNT; +int columnar_chunk_group_size_limit = DEFAULT_CHUNK_GROUP_SIZE; int columnar_compression_level = 3; static const struct config_enum_entry columnar_compression_options[] = @@ -117,6 +119,19 @@ columnar_init_gucs() NULL, NULL, NULL); + + DefineCustomIntVariable("columnar.chunk_group_size_limit", + "Maximum size per chunk group.", + NULL, + &columnar_chunk_group_size_limit, + DEFAULT_CHUNK_GROUP_SIZE, + CHUNK_GROUP_SIZE_MINIMUM, + CHUNK_GROUP_SIZE_MAXIMUM, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); } diff --git a/src/backend/columnar/columnar_compression.c b/src/backend/columnar/columnar_compression.c index 2ff35da98..4f955f6eb 100644 --- a/src/backend/columnar/columnar_compression.c +++ b/src/backend/columnar/columnar_compression.c @@ -270,3 +270,45 @@ DecompressBuffer(StringInfo buffer, } } } + + +/* + * Return worst-case compressed size for the given input size and + * compression type. For unsupported types, return the input size. + */ +int +GetMaxCompressedLength(int size, CompressionType compressionType) +{ + Assert(compressionType >= 0 && compressionType < COMPRESSION_COUNT); + + switch (compressionType) + { + case COMPRESSION_NONE: + { + return size; + } +#if HAVE_CITUS_LIBLZ4 + case COMPRESSION_LZ4: + { + return LZ4_compressBound(size); + } +#endif + +#if HAVE_LIBZSTD + case COMPRESSION_ZSTD: + { + return (int) ZSTD_compressBound(size); + } +#endif + + case COMPRESSION_PG_LZ: + { + return (int) (PGLZ_MAX_OUTPUT(size) + COLUMNAR_COMPRESS_HDRSZ); + } + + default: + { + ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType))); + } + } +} \ No newline at end of file diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index cd62c8b0c..af5b2ec90 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -147,13 +147,21 @@ static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata); PG_FUNCTION_INFO_V1(columnar_relation_storageid); -/* constants for columnar.options */ -#define Natts_columnar_options 5 +/* + * constants for columnar.options + * + * The attnum for chunk_group_size_limit will be 6 becuase + * we don't define this columns at the table definition, + * we add this new column at the time of update in citus_columnar--12.2-1--13.2-1.sql. + * so it ALTER TABLE automatically get the atnum 6. + */ +#define Natts_columnar_options 6 #define Anum_columnar_options_regclass 1 #define Anum_columnar_options_chunk_group_row_limit 2 #define Anum_columnar_options_stripe_row_limit 3 #define Anum_columnar_options_compression_level 4 #define Anum_columnar_options_compression 5 +#define Anum_columnar_options_chunk_group_size_limit 6 /* ---------------- * columnar.options definition. @@ -166,6 +174,7 @@ typedef struct FormData_columnar_options int32 stripe_row_limit; int32 compressionLevel; NameData compression; + int32 chunk_group_size_limit; #ifdef CATALOG_VARLEN /* variable-length fields start here */ #endif @@ -231,6 +240,7 @@ InitColumnarOptions(Oid regclass) ColumnarOptions defaultOptions = { .chunkRowCount = columnar_chunk_group_row_limit, .stripeRowCount = columnar_stripe_row_limit, + .maxChunkSize = columnar_chunk_group_size_limit, .compressionType = columnar_compression, .compressionLevel = columnar_compression_level }; @@ -273,6 +283,21 @@ ParseColumnarRelOptions(List *reloptions, ColumnarOptions *options) (uint64) CHUNK_ROW_COUNT_MAXIMUM))); } } + else if (strcmp(elem->defname, "chunk_group_size_limit") == 0) + { + options->maxChunkSize = (elem->arg == NULL) ? + columnar_chunk_group_size_limit : defGetInt64(elem); + + if (options->maxChunkSize < CHUNK_GROUP_SIZE_MINIMUM || + options->maxChunkSize > CHUNK_GROUP_SIZE_MAXIMUM) + { + ereport(ERROR, (errmsg("chunk group size limit out of range"), + errhint("chunk group size limit must be between " + UINT64_FORMAT " and " UINT64_FORMAT, + (uint64) CHUNK_GROUP_SIZE_MINIMUM, + (uint64) CHUNK_GROUP_SIZE_MAXIMUM))); + } + } else if (strcmp(elem->defname, "stripe_row_limit") == 0) { options->stripeRowCount = (elem->arg == NULL) ? @@ -425,6 +450,7 @@ WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite) Int32GetDatum(options->stripeRowCount), Int32GetDatum(options->compressionLevel), 0, /* to be filled below */ + Int32GetDatum(options->maxChunkSize), }; NameData compressionName = { 0 }; @@ -458,6 +484,7 @@ WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite) update[Anum_columnar_options_stripe_row_limit - 1] = true; update[Anum_columnar_options_compression_level - 1] = true; update[Anum_columnar_options_compression - 1] = true; + update[Anum_columnar_options_chunk_group_size_limit - 1] = true; HeapTuple tuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, nulls, update); @@ -581,6 +608,7 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options) options->chunkRowCount = tupOptions->chunk_group_row_limit; options->stripeRowCount = tupOptions->stripe_row_limit; + options->maxChunkSize = tupOptions->chunk_group_size_limit; options->compressionLevel = tupOptions->compressionLevel; options->compressionType = ParseCompressionType(NameStr(tupOptions->compression)); } @@ -590,6 +618,7 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options) options->compressionType = columnar_compression; options->stripeRowCount = columnar_stripe_row_limit; options->chunkRowCount = columnar_chunk_group_row_limit; + options->maxChunkSize = columnar_chunk_group_size_limit; options->compressionLevel = columnar_compression_level; } diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index e698d1a41..8e2a9259a 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -61,6 +61,14 @@ struct ColumnarWriteState EmptyStripeReservation *emptyStripeReservation; ColumnarOptions options; ChunkData *chunkData; + uint32 currentChunkRowIndex; + uint32 currentChunkIndex; + + /* + * accounting for creating new chunks groups when + * size limit reaches + */ + Size currentChunkBytes; List *chunkGroupRowCounts; @@ -73,6 +81,8 @@ struct ColumnarWriteState StringInfo compressionBuffer; }; +static StripeSkipList * ExpandStripeSkipListChunks(StripeSkipList *stripeSkipList, uint32 newChunkIndex); +static StripeBuffers * ExpandStripeBuffersChunks(StripeBuffers *stripeBuffers, uint32 newChunkIndex); static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 chunkRowCount, uint32 columnCount); @@ -174,13 +184,16 @@ uint64 ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *columnNulls) { uint32 columnIndex = 0; + Size totalRowSize = 0; StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; ColumnarOptions *options = &writeState->options; const uint32 chunkRowCount = options->chunkRowCount; + const uint32 maxChunkCount = (options->stripeRowCount / chunkRowCount) + 1; ChunkData *chunkData = writeState->chunkData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); + bool shouldSerializeEarly = false; if (stripeBuffers == NULL) { @@ -209,10 +222,87 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu { chunkData->valueBufferArray[columnIndex] = makeStringInfo(); } + + writeState->currentChunkBytes = 0; + writeState->currentChunkIndex = 0; + writeState->currentChunkRowIndex = 0; + + /* Ensure maxChunkSize is set with a reasonable default */ + Assert(options->maxChunkSize >= CHUNK_GROUP_SIZE_MINIMUM && + options->maxChunkSize <= CHUNK_GROUP_SIZE_MAXIMUM); } - uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount; - uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; + uint32 chunkIndex = writeState->currentChunkIndex; + uint32 chunkRowIndex = writeState->currentChunkRowIndex; + + /* + * Calculate total serialized current row size without actually serializing. + * This uses the same logic as SerializeSingleDatum but only computes sizes. + */ + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + if (!columnNulls[columnIndex]) + { + Form_pg_attribute attributeForm = + TupleDescAttr(writeState->tupleDescriptor, columnIndex); + int columnTypeLength = attributeForm->attlen; + char columnTypeAlign = attributeForm->attalign; + + uint32 datumLength = att_addlength_datum(0, columnTypeLength, columnValues[columnIndex]); + uint32 datumLengthAligned = att_align_nominal(datumLength, columnTypeAlign); + + totalRowSize += (Size) datumLengthAligned; + } + } + + /* + * Check if we need to serialize a chunk group earliar due to size limits. + * We also need to account to worst case copressed data size that can + * also exceed the limits. + */ + if (chunkRowIndex > 0) + { + int64 chunkGroupLimit = CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize); + int64 maxCompressedSize = GetMaxCompressedLength(writeState->currentChunkBytes, + writeState->options.compressionType); + + shouldSerializeEarly = (maxCompressedSize + totalRowSize > chunkGroupLimit); + } + + /* + * If adding the current row spills out from the defined chunk grupu size limit, we + * will then add the current row in a seperate chunk and will serialize + * all rows data before it. + */ + if (shouldSerializeEarly) + { + elog(DEBUG1, "Row size (%zu bytes) exceeds chunk group size limit (%zu bytes), " + "storing in a separate chunk group", + totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize)); + + /* + * Before putting row in a seperate chunk we have to allocate space + * for the new chunk if maxChunkCount reached. + */ + if (chunkIndex + 1 >= maxChunkCount) + { + ExpandStripeBuffersChunks(stripeBuffers, chunkIndex + 1); + ExpandStripeSkipListChunks(stripeSkipList, chunkIndex + 1); + } + + /* + * Size limit reached, now serialize upto the last row. + * We make sure not to serialize the current row data and only upto + * the last row, so we use `chunkRowIndex` instead of `chunkRowIndex + 1` + * in order to skip current row. Current row will go in the next chunk. + */ + SerializeChunkData(writeState, chunkIndex, chunkRowIndex); + writeState->currentChunkBytes = 0; + + /* Adjust the indices after deciding to start a new chunk */ + chunkIndex = ++writeState->currentChunkIndex; + chunkRowIndex = writeState->currentChunkRowIndex = 0; + } for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { @@ -249,17 +339,29 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu chunkSkipNode->rowCount++; } + writeState->currentChunkBytes += totalRowSize; stripeSkipList->chunkCount = chunkIndex + 1; /* last row of the chunk is inserted serialize the chunk */ if (chunkRowIndex == chunkRowCount - 1) { SerializeChunkData(writeState, chunkIndex, chunkRowCount); + writeState->currentChunkBytes = 0; + writeState->currentChunkIndex++; + writeState->currentChunkRowIndex = 0; } uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber + stripeBuffers->rowCount; stripeBuffers->rowCount++; + + /* + * don't increment when chunk row limit was reached and new chunk was + * created, writeState->currentChunkRowIndex should suppose to be remain `0` + * in this case. + */ + if (chunkRowIndex != chunkRowCount - 1) writeState->currentChunkRowIndex++; + if (stripeBuffers->rowCount >= options->stripeRowCount) { ColumnarFlushPendingWrites(writeState); @@ -318,6 +420,86 @@ ColumnarWritePerTupleContext(ColumnarWriteState *state) return state->perTupleContext; } +/* + * ExpandStripeBuffersChunks adds one more chunk to all columns in an existing + * StripeBuffers structure using repalloc. + */ +static StripeBuffers * +ExpandStripeBuffersChunks(StripeBuffers *stripeBuffers, uint32 newChunkIndex) +{ + if (stripeBuffers == NULL || stripeBuffers->columnBuffersArray == NULL) + { + return NULL; + } + + uint32 columnCount = stripeBuffers->columnCount; + + /* Iterate through all columns and expand their chunk arrays */ + for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; + if (columnBuffers == NULL || columnBuffers->chunkBuffersArray == NULL) + { + continue; + } + + /* Use repalloc to expand the chunkBuffersArray */ + columnBuffers->chunkBuffersArray = (ColumnChunkBuffers **) + repalloc(columnBuffers->chunkBuffersArray, + (newChunkIndex + 1) * sizeof(ColumnChunkBuffers *)); + + /* Allocate and initialize the new chunk buffer */ + columnBuffers->chunkBuffersArray[newChunkIndex] = palloc0(sizeof(ColumnChunkBuffers)); + columnBuffers->chunkBuffersArray[newChunkIndex]->existsBuffer = NULL; + columnBuffers->chunkBuffersArray[newChunkIndex]->valueBuffer = NULL; + columnBuffers->chunkBuffersArray[newChunkIndex]->valueCompressionType = COMPRESSION_NONE; + } + + return stripeBuffers; +} + + +/* + * ExpandStripeSkipListChunks adds one more chunk to all columns in an existing + * StripeSkipList structure using repalloc. + */ +static StripeSkipList * +ExpandStripeSkipListChunks(StripeSkipList *stripeSkipList, uint32 newChunkIndex) +{ + if (stripeSkipList == NULL || stripeSkipList->chunkSkipNodeArray == NULL) + { + return NULL; + } + + uint32 columnCount = stripeSkipList->columnCount; + + /* Iterate through all columns and expand their chunk skip node arrays */ + for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + if (stripeSkipList->chunkSkipNodeArray[columnIndex] == NULL) + { + continue; + } + + /* Use repalloc to expand the chunk skip node array for this column */ + stripeSkipList->chunkSkipNodeArray[columnIndex] = (ColumnChunkSkipNode *) + repalloc(stripeSkipList->chunkSkipNodeArray[columnIndex], + (newChunkIndex + 1) * sizeof(ColumnChunkSkipNode)); + + /* Initialize the new chunk skip node (equivalent to palloc0 behavior) */ + memset(&stripeSkipList->chunkSkipNodeArray[columnIndex][newChunkIndex], + 0, sizeof(ColumnChunkSkipNode)); + } + + /* Update the chunk count if the new chunk index is beyond current count */ + if (newChunkIndex >= stripeSkipList->chunkCount) + { + stripeSkipList->chunkCount = newChunkIndex + 1; + } + + return stripeSkipList; +} + /* * CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given @@ -404,9 +586,8 @@ FlushStripe(ColumnarWriteState *writeState) TupleDesc tupleDescriptor = writeState->tupleDescriptor; uint32 columnCount = tupleDescriptor->natts; uint32 chunkCount = stripeSkipList->chunkCount; - uint32 chunkRowCount = writeState->options.chunkRowCount; - uint32 lastChunkIndex = stripeBuffers->rowCount / chunkRowCount; - uint32 lastChunkRowCount = stripeBuffers->rowCount % chunkRowCount; + uint32 lastChunkIndex = writeState->currentChunkIndex; + uint32 lastChunkRowCount = writeState->currentChunkRowIndex; uint64 stripeSize = 0; uint64 stripeRowCount = stripeBuffers->rowCount; diff --git a/src/backend/columnar/sql/citus_columnar--13.2-1--14.0-1.sql b/src/backend/columnar/sql/citus_columnar--13.2-1--14.0-1.sql index 016c78f6b..cecdd864c 100644 --- a/src/backend/columnar/sql/citus_columnar--13.2-1--14.0-1.sql +++ b/src/backend/columnar/sql/citus_columnar--13.2-1--14.0-1.sql @@ -1,2 +1,139 @@ -- citus_columnar--13.2-1--14.0-1 -- bump version to 14.0-1 + +-- Support for new column `chunk_group_size_limit` +ALTER TABLE columnar_internal.options ADD COLUMN chunk_group_size_limit int NOT NULL; + +-- After adding a new column `chunk_group_size_limit` +-- to table `columnar_internal.options` we have to redefine +-- all the related functions. In this case we have only table set +-- and reset funcitons to be redefined. + +DROP FUNCTION IF EXISTS alter_columnar_table_set, alter_columnar_table_reset; + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int DEFAULT NULL, + stripe_row_limit int DEFAULT NULL, + chunk_group_size_limit int DEFAULT NULL, + compression name DEFAULT null, + compression_level int DEFAULT NULL) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_set$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' SET ('; +begin + if (chunk_group_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit=' || chunk_group_row_limit; + noop := false; + end if; + if (stripe_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit=' || stripe_row_limit; + noop := false; + end if; + if (chunk_group_size_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_size_limit=' || chunk_group_size_limit; + noop := false; + end if; + if (compression is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression=' || compression; + noop := false; + end if; + if (compression_level is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level=' || compression_level; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_set$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int, + stripe_row_limit int, + chunk_group_size_limit int, + compression name, + compression_level int) +IS 'set one or more options on a columnar table, when set to NULL no change is made'; + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool DEFAULT false, + stripe_row_limit bool DEFAULT false, + chunk_group_size_limit bool DEFAULT false, + compression bool DEFAULT false, + compression_level bool DEFAULT false) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_reset$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' RESET ('; +begin + if (chunk_group_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit'; + noop := false; + end if; + if (stripe_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit'; + noop := false; + end if; + if (chunk_group_size_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_size_limit'; + noop := false; + end if; + if (compression) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression'; + noop := false; + end if; + if (compression_level) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level'; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_reset$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool, + stripe_row_limit bool, + chunk_group_size_limit bool, + compression bool, + compression_level bool) +IS 'reset on or more options on a columnar table to the system defaults'; + + +-- Redefine view for columnar options this time adding the new column `chunk_group_size_limit` + +DROP VIEW IF EXISTS columnar.options; + +CREATE VIEW columnar.options WITH (security_barrier) AS + SELECT regclass AS relation, chunk_group_row_limit, chunk_group_size_limit, + stripe_row_limit, compression, compression_level + FROM columnar_internal.options o, pg_class c + WHERE o.regclass = c.oid + AND pg_has_role(c.relowner, 'USAGE'); +COMMENT ON VIEW columnar.options + IS 'Columnar options for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.options TO PUBLIC; diff --git a/src/backend/columnar/sql/downgrades/citus_columnar--14.0-1--13.2-1.sql b/src/backend/columnar/sql/downgrades/citus_columnar--14.0-1--13.2-1.sql index 0504d0048..24f0cc5dc 100644 --- a/src/backend/columnar/sql/downgrades/citus_columnar--14.0-1--13.2-1.sql +++ b/src/backend/columnar/sql/downgrades/citus_columnar--14.0-1--13.2-1.sql @@ -1,2 +1,120 @@ -- citus_columnar--14.0-1--13.2-1 -- downgrade version to 13.2-1 + + +-- Remove column `chunk_group_size_limit` +ALTER TABLE columnar_internal.options DROP COLUMN chunk_group_size_limit; + +-- Remove column `chunk_group_size_limit` by redefining the functions & views +DROP VIEW IF EXISTS columnar.options; +DROP FUNCTION IF EXISTS alter_columnar_table_set, alter_columnar_table_reset; + + +-- Redefine +CREATE VIEW columnar.options WITH (security_barrier) AS + SELECT regclass AS relation, chunk_group_row_limit, + stripe_row_limit, compression, compression_level + FROM columnar_internal.options o, pg_class c + WHERE o.regclass = c.oid + AND pg_has_role(c.relowner, 'USAGE'); +COMMENT ON VIEW columnar.options + IS 'Columnar options for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.options TO PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int DEFAULT NULL, + stripe_row_limit int DEFAULT NULL, + compression name DEFAULT null, + compression_level int DEFAULT NULL) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_set$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' SET ('; +begin + if (chunk_group_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit=' || chunk_group_row_limit; + noop := false; + end if; + if (stripe_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit=' || stripe_row_limit; + noop := false; + end if; + if (compression is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression=' || compression; + noop := false; + end if; + if (compression_level is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level=' || compression_level; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_set$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int, + stripe_row_limit int, + compression name, + compression_level int) +IS 'set one or more options on a columnar table, when set to NULL no change is made'; + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool DEFAULT false, + stripe_row_limit bool DEFAULT false, + compression bool DEFAULT false, + compression_level bool DEFAULT false) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_reset$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' RESET ('; +begin + if (chunk_group_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit'; + noop := false; + end if; + if (stripe_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit'; + noop := false; + end if; + if (compression) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression'; + noop := false; + end if; + if (compression_level) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level'; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_reset$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool, + stripe_row_limit bool, + compression bool, + compression_level bool) +IS 'reset on or more options on a columnar table to the system defaults'; \ No newline at end of file diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 1883be38b..b40d20b4a 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -50,6 +50,8 @@ #define CHUNK_ROW_COUNT_MAXIMUM 100000 #define COMPRESSION_LEVEL_MIN 1 #define COMPRESSION_LEVEL_MAX 19 +#define CHUNK_GROUP_SIZE_MINIMUM 1 +#define CHUNK_GROUP_SIZE_MAXIMUM 1024 /* going beyond 1024 cause enlargeStringInfo() go out of memory */ /* Columnar file signature */ #define COLUMNAR_VERSION_MAJOR 2 @@ -60,6 +62,7 @@ #define COLUMNAR_POSTSCRIPT_SIZE_LENGTH 1 #define COLUMNAR_POSTSCRIPT_SIZE_MAX 256 #define COLUMNAR_BYTES_PER_PAGE (BLCKSZ - SizeOfPageHeaderData) +#define CHUNK_GROUP_SIZE_MB_TO_BYTES(mb) ((Size)((mb) * 1024UL * 1024UL)) /*global variables for citus_columnar fake version Y */ #define CITUS_COLUMNAR_INTERNAL_VERSION "11.1-0" @@ -81,6 +84,7 @@ typedef struct ColumnarOptions { uint64 stripeRowCount; uint32 chunkRowCount; + uint32 maxChunkSize; CompressionType compressionType; int compressionLevel; } ColumnarOptions; @@ -229,6 +233,7 @@ typedef struct ColumnarWriteState ColumnarWriteState; extern int columnar_compression; extern int columnar_stripe_row_limit; extern int columnar_chunk_group_row_limit; +extern int columnar_chunk_group_size_limit; extern int columnar_compression_level; /* called when the user changes options on the given relation */ diff --git a/src/include/columnar/columnar_compression.h b/src/include/columnar/columnar_compression.h index 5b9710f20..d93d3fd97 100644 --- a/src/include/columnar/columnar_compression.h +++ b/src/include/columnar/columnar_compression.h @@ -30,5 +30,5 @@ extern bool CompressBuffer(StringInfo inputBuffer, int compressionLevel); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType, uint64 decompressedSize); - +extern int GetMaxCompressedLength(int size, CompressionType compressionType); #endif /* COLUMNAR_COMPRESSION_H */ diff --git a/src/test/regress/columnar_schedule b/src/test/regress/columnar_schedule index 4c36e4ddd..ebb509544 100644 --- a/src/test/regress/columnar_schedule +++ b/src/test/regress/columnar_schedule @@ -33,3 +33,4 @@ test: columnar_recursive test: columnar_transactions test: columnar_matview test: columnar_memory +test: columnar_chunk_sizes diff --git a/src/test/regress/expected/columnar_chunk_sizes.out b/src/test/regress/expected/columnar_chunk_sizes.out new file mode 100644 index 000000000..3d6e5a5f4 --- /dev/null +++ b/src/test/regress/expected/columnar_chunk_sizes.out @@ -0,0 +1,163 @@ +CREATE SCHEMA columnar_chunk_test; +SET search_path TO columnar_chunk_test; +SET columnar.compression TO 'none'; +-- set to debug1 to see how many new chunks has been created during +-- chunk_group_size_limit overflow +SET client_min_messages TO debug1; +-- +-- ISSUE_6420 +-- +-- Issue: Automatically allocate a new chunk group instead of throwing error due to buffer size limits +-- Link: https://github.com/citusdata/citus/issues/6420 +-- +-- Insert rows that exceeds the chunk group size limit. +-- Adding 600 rows each with the size of 2MB will eventually exceeds the +-- limit of 1GB for enlargeStringInfo() but this should not fail. +-- Also setting chunk_group_size_limit to will exceed the max chunk groups limit 5000/1000 = 5, new +-- chunkgroup should be allocated automatically +CREATE TABLE test_oversized_row ( + id INTEGER, + huge_text TEXT +) USING columnar WITH ( + columnar.chunk_group_row_limit = 1000, + columnar.stripe_row_limit = 1500, + columnar.chunk_group_size_limit = 128 +); +-- test with chunk & stripe row limit reached +INSERT INTO test_oversized_row +SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text +FROM generate_series(1, 1600) AS gs; +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group +DEBUG: Flushing Stripe of size 1500 +DEBUG: Flushing Stripe of size 100 +SET client_min_messages TO warning; +-- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | chunk_group_num | row_count +--------------------------------------------------------------------- + test_oversized_row | 10000000261 | 1 | 0 | 127 + test_oversized_row | 10000000261 | 1 | 1 | 127 + test_oversized_row | 10000000261 | 1 | 2 | 127 + test_oversized_row | 10000000261 | 1 | 3 | 127 + test_oversized_row | 10000000261 | 1 | 4 | 127 + test_oversized_row | 10000000261 | 1 | 5 | 127 + test_oversized_row | 10000000261 | 1 | 6 | 127 + test_oversized_row | 10000000261 | 1 | 7 | 127 + test_oversized_row | 10000000261 | 1 | 8 | 127 + test_oversized_row | 10000000261 | 1 | 9 | 127 + test_oversized_row | 10000000261 | 1 | 10 | 127 + test_oversized_row | 10000000261 | 1 | 11 | 103 + test_oversized_row | 10000000261 | 2 | 0 | 100 +(13 rows) + +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number +--------------------------------------------------------------------- + test_oversized_row | 10000000261 | 1 | 16336 | 1572876378 | 2 | 1000 | 1500 | 12 | 1 + test_oversized_row | 10000000261 | 2 | 1572895424 | 104858426 | 2 | 1000 | 100 | 1 | 1501 +(2 rows) + +SELECT COUNT(*) FROM test_oversized_row; + count +--------------------------------------------------------------------- + 1600 +(1 row) + +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; + id | length +--------------------------------------------------------------------- + 1 | 1048576 + 2 | 1048576 + 3 | 1048576 + 4 | 1048576 + 5 | 1048576 + 6 | 1048576 + 7 | 1048576 + 8 | 1048576 + 9 | 1048576 + 10 | 1048576 +(10 rows) + +\dt+ test_oversized_row + List of relations + Schema | Name | Type | Owner | Persistence | Size | Description +--------------------------------------------------------------------- + columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1605 MB | +(1 row) + +-- test edge case setting chunk_group_size_limit = 1024 +DROP TABLE test_oversized_row; +SET client_min_messages TO debug1; +SET columnar.compression TO default; +CREATE TABLE test_oversized_row ( + id INTEGER, + huge_text TEXT +) USING columnar WITH ( + columnar.chunk_group_row_limit = 1000, + columnar.stripe_row_limit = 5000, + columnar.chunk_group_size_limit = 1024 +); +INSERT INTO test_oversized_row +SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text +FROM generate_series(1, 600) AS gs; +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group +DEBUG: Flushing Stripe of size 600 +-- test VACUUM FULL +VACUUM FULL test_oversized_row; +DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group +DEBUG: Flushing Stripe of size 600 +SET client_min_messages TO warning; +-- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | chunk_group_num | row_count +--------------------------------------------------------------------- + test_oversized_row | 10000000263 | 1 | 0 | 510 + test_oversized_row | 10000000263 | 1 | 1 | 90 +(2 rows) + +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; + relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number +--------------------------------------------------------------------- + test_oversized_row | 10000000263 | 1 | 16336 | 49278 | 2 | 1000 | 600 | 2 | 1 +(1 row) + +SELECT COUNT(*) FROM test_oversized_row; + count +--------------------------------------------------------------------- + 600 +(1 row) + +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; + id | length +--------------------------------------------------------------------- + 1 | 2097152 + 2 | 2097152 + 3 | 2097152 + 4 | 2097152 + 5 | 2097152 + 6 | 2097152 + 7 | 2097152 + 8 | 2097152 + 9 | 2097152 + 10 | 2097152 +(10 rows) + +\dt+ test_oversized_row + List of relations + Schema | Name | Type | Owner | Persistence | Size | Description +--------------------------------------------------------------------- + columnar_chunk_test | test_oversized_row | table | postgres | permanent | 72 kB | +(1 row) + +DROP TABLE test_oversized_row; +DROP SCHEMA columnar_chunk_test CASCADE; diff --git a/src/test/regress/expected/columnar_empty.out b/src/test/regress/expected/columnar_empty.out index ed5742087..706f0e455 100644 --- a/src/test/regress/expected/columnar_empty.out +++ b/src/test/regress/expected/columnar_empty.out @@ -8,9 +8,9 @@ ALTER TABLE t_compressed SET (columnar.compression = pglz); ALTER TABLE t_compressed SET (columnar.stripe_row_limit = 2000); ALTER TABLE t_compressed SET (columnar.chunk_group_row_limit = 1000); SELECT * FROM columnar.options WHERE relation = 't_compressed'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - t_compressed | 1000 | 2000 | pglz | 3 + t_compressed | 1000 | 256 | 2000 | pglz | 3 (1 row) -- select diff --git a/src/test/regress/expected/columnar_matview.out b/src/test/regress/expected/columnar_matview.out index 2b741273e..5324b3216 100644 --- a/src/test/regress/expected/columnar_matview.out +++ b/src/test/regress/expected/columnar_matview.out @@ -26,27 +26,27 @@ SELECT * FROM t_view a ORDER BY a; -- show columnar options for materialized view SELECT * FROM columnar.options WHERE relation = 't_view'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - t_view | 10000 | 150000 | none | 3 + t_view | 10000 | 256 | 150000 | none | 3 (1 row) -- show we can set options on a materialized view ALTER TABLE t_view SET (columnar.compression = pglz); SELECT * FROM columnar.options WHERE relation = 't_view'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - t_view | 10000 | 150000 | pglz | 3 + t_view | 10000 | 256 | 150000 | pglz | 3 (1 row) REFRESH MATERIALIZED VIEW t_view; -- verify options have not been changed SELECT * FROM columnar.options WHERE relation = 't_view'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - t_view | 10000 | 150000 | pglz | 3 + t_view | 10000 | 256 | 150000 | pglz | 3 (1 row) SELECT * FROM t_view a ORDER BY a; diff --git a/src/test/regress/expected/columnar_permissions.out b/src/test/regress/expected/columnar_permissions.out index 7f9e4e2c6..5cc51d98d 100644 --- a/src/test/regress/expected/columnar_permissions.out +++ b/src/test/regress/expected/columnar_permissions.out @@ -88,11 +88,11 @@ ERROR: must be owner of table no_access select alter_columnar_table_reset('no_access', chunk_group_row_limit => true); ERROR: must be owner of table no_access CONTEXT: SQL statement "ALTER TABLE no_access RESET (columnar.chunk_group_row_limit)" -PL/pgSQL function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) line XX at EXECUTE +PL/pgSQL function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean,boolean) line XX at EXECUTE select alter_columnar_table_set('no_access', chunk_group_row_limit => 1111); ERROR: must be owner of table no_access CONTEXT: SQL statement "ALTER TABLE no_access SET (columnar.chunk_group_row_limit=1111)" -PL/pgSQL function alter_columnar_table_set(regclass,integer,integer,name,integer) line XX at EXECUTE +PL/pgSQL function alter_columnar_table_set(regclass,integer,integer,integer,name,integer) line XX at EXECUTE \c - :current_user -- should see tuples from both columnar_permissions and no_access select relation, chunk_group_row_limit, stripe_row_limit, compression, compression_level diff --git a/src/test/regress/expected/columnar_pg15.out b/src/test/regress/expected/columnar_pg15.out index 62d2de2dc..e901cbd0a 100644 --- a/src/test/regress/expected/columnar_pg15.out +++ b/src/test/regress/expected/columnar_pg15.out @@ -1,7 +1,7 @@ CREATE TABLE alter_am(i int); INSERT INTO alter_am SELECT generate_series(1,1000000); SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- (0 rows) @@ -15,9 +15,9 @@ ALTER TABLE alter_am SET ACCESS METHOD columnar, SET (columnar.compression = pglz, fillfactor = 20); SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - alter_am | 10000 | 150000 | pglz | 3 + alter_am | 10000 | 256 | 150000 | pglz | 3 (1 row) SELECT SUM(i) FROM alter_am; @@ -29,7 +29,7 @@ SELECT SUM(i) FROM alter_am; ALTER TABLE alter_am SET ACCESS METHOD heap; -- columnar options should be gone SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/columnar_tableoptions.out b/src/test/regress/expected/columnar_tableoptions.out index dd85c715c..58d9a862d 100644 --- a/src/test/regress/expected/columnar_tableoptions.out +++ b/src/test/regress/expected/columnar_tableoptions.out @@ -6,9 +6,9 @@ INSERT INTO table_options SELECT generate_series(1,100); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 10000 | 150000 | none | 3 + table_options | 10000 | 256 | 150000 | none | 3 (1 row) -- test changing the compression @@ -16,9 +16,9 @@ ALTER TABLE table_options SET (columnar.compression = pglz); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 10000 | 150000 | pglz | 3 + table_options | 10000 | 256 | 150000 | pglz | 3 (1 row) -- test changing the compression level @@ -26,9 +26,9 @@ ALTER TABLE table_options SET (columnar.compression_level = 5); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 10000 | 150000 | pglz | 5 + table_options | 10000 | 256 | 150000 | pglz | 5 (1 row) -- test changing the chunk_group_row_limit @@ -36,9 +36,9 @@ ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 2000); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 2000 | 150000 | pglz | 5 + table_options | 2000 | 256 | 150000 | pglz | 5 (1 row) -- test changing the chunk_group_row_limit @@ -46,9 +46,19 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 4000); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 2000 | 4000 | pglz | 5 + table_options | 2000 | 256 | 4000 | pglz | 5 +(1 row) + +-- test changing the chunk_group_size_limit +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 512); +-- show table_options settings +SELECT * FROM columnar.options +WHERE relation = 'table_options'::regclass; + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level +--------------------------------------------------------------------- + table_options | 2000 | 512 | 4000 | pglz | 5 (1 row) -- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming @@ -56,23 +66,24 @@ VACUUM FULL table_options; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 2000 | 4000 | pglz | 5 + table_options | 2000 | 512 | 4000 | pglz | 5 (1 row) -- set all settings at the same time ALTER TABLE table_options SET (columnar.stripe_row_limit = 8000, columnar.chunk_group_row_limit = 4000, + columnar.chunk_group_size_limit = 128, columnar.compression = none, columnar.compression_level = 7); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) -- make sure table options are not changed when VACUUM a table @@ -80,9 +91,9 @@ VACUUM table_options; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) -- make sure table options are not changed when VACUUM FULL a table @@ -90,9 +101,9 @@ VACUUM FULL table_options; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) -- make sure table options are not changed when truncating a table @@ -100,94 +111,106 @@ TRUNCATE table_options; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) ALTER TABLE table_options ALTER COLUMN a TYPE bigint; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) -- reset settings one by one to the version of the GUC's SET columnar.chunk_group_row_limit TO 1000; SET columnar.stripe_row_limit TO 10000; +SET columnar.chunk_group_size_limit TO 640; SET columnar.compression TO 'pglz'; SET columnar.compression_level TO 11; -- verify setting the GUC's didn't change the settings -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 4000 | 8000 | none | 7 + table_options | 4000 | 128 | 8000 | none | 7 (1 row) ALTER TABLE table_options RESET (columnar.chunk_group_row_limit); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 1000 | 8000 | none | 7 + table_options | 1000 | 128 | 8000 | none | 7 (1 row) ALTER TABLE table_options RESET (columnar.stripe_row_limit); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 1000 | 10000 | none | 7 + table_options | 1000 | 128 | 10000 | none | 7 +(1 row) + +ALTER TABLE table_options RESET (columnar.chunk_group_size_limit); +-- show table_options settings +SELECT * FROM columnar.options +WHERE relation = 'table_options'::regclass; + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level +--------------------------------------------------------------------- + table_options | 1000 | 640 | 10000 | none | 7 (1 row) ALTER TABLE table_options RESET (columnar.compression); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 1000 | 10000 | pglz | 7 + table_options | 1000 | 640 | 10000 | pglz | 7 (1 row) ALTER TABLE table_options RESET (columnar.compression_level); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 1000 | 10000 | pglz | 11 + table_options | 1000 | 640 | 10000 | pglz | 11 (1 row) -- verify resetting all settings at once work SET columnar.chunk_group_row_limit TO 10000; SET columnar.stripe_row_limit TO 100000; +SET columnar.chunk_group_size_limit TO 768; SET columnar.compression TO 'none'; SET columnar.compression_level TO 13; -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 1000 | 10000 | pglz | 11 + table_options | 1000 | 640 | 10000 | pglz | 11 (1 row) ALTER TABLE table_options RESET (columnar.chunk_group_row_limit, columnar.stripe_row_limit, + columnar.chunk_group_size_limit, columnar.compression, columnar.compression_level); -- show table_options settings SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 10000 | 100000 | none | 13 + table_options | 10000 | 768 | 100000 | none | 13 (1 row) -- verify edge cases @@ -234,6 +257,12 @@ HINT: chunk group row count limit must be between 1000 and 100000 ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 0); ERROR: chunk group row count limit out of range HINT: chunk group row count limit must be between 1000 and 100000 +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 1025); +ERROR: chunk group size limit out of range +HINT: chunk group size limit must be between 1 and 1024 +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 0); +ERROR: chunk group size limit out of range +HINT: chunk group size limit must be between 1 and 1024 INSERT INTO table_options VALUES (1); -- multiple SET/RESET clauses ALTER TABLE table_options @@ -241,9 +270,9 @@ ALTER TABLE table_options SET (columnar.compression_level = 6); SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 10000 | 100000 | pglz | 6 + table_options | 10000 | 768 | 100000 | pglz | 6 (1 row) ALTER TABLE table_options @@ -252,9 +281,9 @@ ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 5555); SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 5555 | 100000 | pglz | 6 + table_options | 5555 | 768 | 100000 | pglz | 6 (1 row) -- a no-op; shouldn't throw an error @@ -272,9 +301,9 @@ SELECT alter_columnar_table_reset('table_options', compression => true); (1 row) SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 5555 | 100000 | none | 6 + table_options | 5555 | 768 | 100000 | none | 6 (1 row) SELECT alter_columnar_table_set('table_options', compression_level => 1); @@ -284,9 +313,9 @@ SELECT alter_columnar_table_set('table_options', compression_level => 1); (1 row) SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- - table_options | 5555 | 100000 | none | 1 + table_options | 5555 | 768 | 100000 | none | 1 (1 row) -- error: set columnar options on heap tables @@ -303,7 +332,7 @@ DROP TABLE heap_options; DROP TABLE table_options; -- we expect no entries in çstore.options for anything not found int pg_class SELECT * FROM columnar.options o WHERE o.relation NOT IN (SELECT oid FROM pg_class); - relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level + relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/sql/columnar_chunk_sizes.sql b/src/test/regress/sql/columnar_chunk_sizes.sql new file mode 100644 index 000000000..65b784e04 --- /dev/null +++ b/src/test/regress/sql/columnar_chunk_sizes.sql @@ -0,0 +1,78 @@ +CREATE SCHEMA columnar_chunk_test; +SET search_path TO columnar_chunk_test; +SET columnar.compression TO 'none'; + +-- set to debug1 to see how many new chunks has been created during +-- chunk_group_size_limit overflow + +SET client_min_messages TO debug1; + + +-- +-- ISSUE_6420 +-- +-- Issue: Automatically allocate a new chunk group instead of throwing error due to buffer size limits +-- Link: https://github.com/citusdata/citus/issues/6420 +-- +-- Insert rows that exceeds the chunk group size limit. +-- Adding 600 rows each with the size of 2MB will eventually exceeds the +-- limit of 1GB for enlargeStringInfo() but this should not fail. +-- Also setting chunk_group_size_limit to will exceed the max chunk groups limit 5000/1000 = 5, new +-- chunkgroup should be allocated automatically + +CREATE TABLE test_oversized_row ( + id INTEGER, + huge_text TEXT +) USING columnar WITH ( + columnar.chunk_group_row_limit = 1000, + columnar.stripe_row_limit = 1500, + columnar.chunk_group_size_limit = 128 +); + +-- test with chunk & stripe row limit reached +INSERT INTO test_oversized_row +SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text +FROM generate_series(1, 1600) AS gs; + +SET client_min_messages TO warning; + +-- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; +SELECT COUNT(*) FROM test_oversized_row; +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; +\dt+ test_oversized_row + + +-- test edge case setting chunk_group_size_limit = 1024 +DROP TABLE test_oversized_row; +SET client_min_messages TO debug1; +SET columnar.compression TO default; + +CREATE TABLE test_oversized_row ( + id INTEGER, + huge_text TEXT +) USING columnar WITH ( + columnar.chunk_group_row_limit = 1000, + columnar.stripe_row_limit = 5000, + columnar.chunk_group_size_limit = 1024 +); + +INSERT INTO test_oversized_row +SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text +FROM generate_series(1, 600) AS gs; + +-- test VACUUM FULL +VACUUM FULL test_oversized_row; + +SET client_min_messages TO warning; + +-- try verifying the data integrity +SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass; +SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass; +SELECT COUNT(*) FROM test_oversized_row; +SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10; +\dt+ test_oversized_row + +DROP TABLE test_oversized_row; +DROP SCHEMA columnar_chunk_test CASCADE; \ No newline at end of file diff --git a/src/test/regress/sql/columnar_tableoptions.sql b/src/test/regress/sql/columnar_tableoptions.sql index 34d882369..aedde51cd 100644 --- a/src/test/regress/sql/columnar_tableoptions.sql +++ b/src/test/regress/sql/columnar_tableoptions.sql @@ -37,6 +37,13 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 4000); SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; +-- test changing the chunk_group_size_limit +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 512); + +-- show table_options settings +SELECT * FROM columnar.options +WHERE relation = 'table_options'::regclass; + -- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming VACUUM FULL table_options; @@ -48,6 +55,7 @@ WHERE relation = 'table_options'::regclass; ALTER TABLE table_options SET (columnar.stripe_row_limit = 8000, columnar.chunk_group_row_limit = 4000, + columnar.chunk_group_size_limit = 128, columnar.compression = none, columnar.compression_level = 7); @@ -81,6 +89,7 @@ WHERE relation = 'table_options'::regclass; -- reset settings one by one to the version of the GUC's SET columnar.chunk_group_row_limit TO 1000; SET columnar.stripe_row_limit TO 10000; +SET columnar.chunk_group_size_limit TO 640; SET columnar.compression TO 'pglz'; SET columnar.compression_level TO 11; @@ -100,6 +109,12 @@ ALTER TABLE table_options RESET (columnar.stripe_row_limit); SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass; +ALTER TABLE table_options RESET (columnar.chunk_group_size_limit); + +-- show table_options settings +SELECT * FROM columnar.options +WHERE relation = 'table_options'::regclass; + ALTER TABLE table_options RESET (columnar.compression); -- show table_options settings @@ -115,6 +130,7 @@ WHERE relation = 'table_options'::regclass; -- verify resetting all settings at once work SET columnar.chunk_group_row_limit TO 10000; SET columnar.stripe_row_limit TO 100000; +SET columnar.chunk_group_size_limit TO 768; SET columnar.compression TO 'none'; SET columnar.compression_level TO 13; @@ -125,6 +141,7 @@ WHERE relation = 'table_options'::regclass; ALTER TABLE table_options RESET (columnar.chunk_group_row_limit, columnar.stripe_row_limit, + columnar.chunk_group_size_limit, columnar.compression, columnar.compression_level); @@ -160,6 +177,8 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 10000001); ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 999); ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 100001); ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 0); +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 1025); +ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 0); INSERT INTO table_options VALUES (1); -- multiple SET/RESET clauses