pull/8202/merge
Imran Zaheer 2025-12-07 04:11:04 +00:00 committed by GitHub
commit 36bab6322b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 889 additions and 67 deletions

View File

@ -111,6 +111,10 @@ The following options are available:
chunk for _newly-inserted_ data. Existing chunks of data will not be
changed and may have more rows than this maximum value. The default
value is `10000`.
* **columnar.chunk_group_size_limit**: ``<integer>`` - the maximum size in MB per
chunk group for _newly-inserted_ data. Existing chunks of data will not be
changed and may have more size than this maximum value. The default
value is `256`.
View options for all tables with:
@ -125,6 +129,7 @@ following GUCs:
* `columnar.compression_level`
* `columnar.stripe_row_limit`
* `columnar.chunk_group_row_limit`
* `columnar.chunk_group_size_limit`
GUCs only affect newly-created *tables*, not any newly-created
*stripes* on an existing table.

View File

@ -29,6 +29,7 @@
/* Default values for option parameters */
#define DEFAULT_STRIPE_ROW_COUNT 150000
#define DEFAULT_CHUNK_ROW_COUNT 10000
#define DEFAULT_CHUNK_GROUP_SIZE 256 /* 256 MB */
#if HAVE_LIBZSTD
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_ZSTD
@ -41,6 +42,7 @@
int columnar_compression = DEFAULT_COMPRESSION_TYPE;
int columnar_stripe_row_limit = DEFAULT_STRIPE_ROW_COUNT;
int columnar_chunk_group_row_limit = DEFAULT_CHUNK_ROW_COUNT;
int columnar_chunk_group_size_limit = DEFAULT_CHUNK_GROUP_SIZE;
int columnar_compression_level = 3;
static const struct config_enum_entry columnar_compression_options[] =
@ -117,6 +119,19 @@ columnar_init_gucs()
NULL,
NULL,
NULL);
DefineCustomIntVariable("columnar.chunk_group_size_limit",
"Maximum size per chunk group.",
NULL,
&columnar_chunk_group_size_limit,
DEFAULT_CHUNK_GROUP_SIZE,
CHUNK_GROUP_SIZE_MINIMUM,
CHUNK_GROUP_SIZE_MAXIMUM,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
}

View File

@ -270,3 +270,45 @@ DecompressBuffer(StringInfo buffer,
}
}
}
/*
* Return worst-case compressed size for the given input size and
* compression type. For unsupported types, return the input size.
*/
int
GetMaxCompressedLength(int size, CompressionType compressionType)
{
Assert(compressionType >= 0 && compressionType < COMPRESSION_COUNT);
switch (compressionType)
{
case COMPRESSION_NONE:
{
return size;
}
#if HAVE_CITUS_LIBLZ4
case COMPRESSION_LZ4:
{
return LZ4_compressBound(size);
}
#endif
#if HAVE_LIBZSTD
case COMPRESSION_ZSTD:
{
return (int) ZSTD_compressBound(size);
}
#endif
case COMPRESSION_PG_LZ:
{
return (int) (PGLZ_MAX_OUTPUT(size) + COLUMNAR_COMPRESS_HDRSZ);
}
default:
{
ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType)));
}
}
}

View File

@ -147,13 +147,21 @@ static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata);
PG_FUNCTION_INFO_V1(columnar_relation_storageid);
/* constants for columnar.options */
#define Natts_columnar_options 5
/*
* constants for columnar.options
*
* The attnum for chunk_group_size_limit will be 6 becuase
* we don't define this columns at the table definition,
* we add this new column at the time of update in citus_columnar--12.2-1--13.2-1.sql.
* so it ALTER TABLE automatically get the atnum 6.
*/
#define Natts_columnar_options 6
#define Anum_columnar_options_regclass 1
#define Anum_columnar_options_chunk_group_row_limit 2
#define Anum_columnar_options_stripe_row_limit 3
#define Anum_columnar_options_compression_level 4
#define Anum_columnar_options_compression 5
#define Anum_columnar_options_chunk_group_size_limit 6
/* ----------------
* columnar.options definition.
@ -166,6 +174,7 @@ typedef struct FormData_columnar_options
int32 stripe_row_limit;
int32 compressionLevel;
NameData compression;
int32 chunk_group_size_limit;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
@ -231,6 +240,7 @@ InitColumnarOptions(Oid regclass)
ColumnarOptions defaultOptions = {
.chunkRowCount = columnar_chunk_group_row_limit,
.stripeRowCount = columnar_stripe_row_limit,
.maxChunkSize = columnar_chunk_group_size_limit,
.compressionType = columnar_compression,
.compressionLevel = columnar_compression_level
};
@ -273,6 +283,21 @@ ParseColumnarRelOptions(List *reloptions, ColumnarOptions *options)
(uint64) CHUNK_ROW_COUNT_MAXIMUM)));
}
}
else if (strcmp(elem->defname, "chunk_group_size_limit") == 0)
{
options->maxChunkSize = (elem->arg == NULL) ?
columnar_chunk_group_size_limit : defGetInt64(elem);
if (options->maxChunkSize < CHUNK_GROUP_SIZE_MINIMUM ||
options->maxChunkSize > CHUNK_GROUP_SIZE_MAXIMUM)
{
ereport(ERROR, (errmsg("chunk group size limit out of range"),
errhint("chunk group size limit must be between "
UINT64_FORMAT " and " UINT64_FORMAT,
(uint64) CHUNK_GROUP_SIZE_MINIMUM,
(uint64) CHUNK_GROUP_SIZE_MAXIMUM)));
}
}
else if (strcmp(elem->defname, "stripe_row_limit") == 0)
{
options->stripeRowCount = (elem->arg == NULL) ?
@ -425,6 +450,7 @@ WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
Int32GetDatum(options->stripeRowCount),
Int32GetDatum(options->compressionLevel),
0, /* to be filled below */
Int32GetDatum(options->maxChunkSize),
};
NameData compressionName = { 0 };
@ -458,6 +484,7 @@ WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
update[Anum_columnar_options_stripe_row_limit - 1] = true;
update[Anum_columnar_options_compression_level - 1] = true;
update[Anum_columnar_options_compression - 1] = true;
update[Anum_columnar_options_chunk_group_size_limit - 1] = true;
HeapTuple tuple = heap_modify_tuple(heapTuple, tupleDescriptor,
values, nulls, update);
@ -581,6 +608,7 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
options->chunkRowCount = tupOptions->chunk_group_row_limit;
options->stripeRowCount = tupOptions->stripe_row_limit;
options->maxChunkSize = tupOptions->chunk_group_size_limit;
options->compressionLevel = tupOptions->compressionLevel;
options->compressionType = ParseCompressionType(NameStr(tupOptions->compression));
}
@ -590,6 +618,7 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
options->compressionType = columnar_compression;
options->stripeRowCount = columnar_stripe_row_limit;
options->chunkRowCount = columnar_chunk_group_row_limit;
options->maxChunkSize = columnar_chunk_group_size_limit;
options->compressionLevel = columnar_compression_level;
}

View File

@ -61,6 +61,14 @@ struct ColumnarWriteState
EmptyStripeReservation *emptyStripeReservation;
ColumnarOptions options;
ChunkData *chunkData;
uint32 currentChunkRowIndex;
uint32 currentChunkIndex;
/*
* accounting for creating new chunks groups when
* size limit reaches
*/
Size currentChunkBytes;
List *chunkGroupRowCounts;
@ -73,6 +81,8 @@ struct ColumnarWriteState
StringInfo compressionBuffer;
};
static StripeSkipList * ExpandStripeSkipListChunks(StripeSkipList *stripeSkipList, uint32 newChunkIndex);
static StripeBuffers * ExpandStripeBuffersChunks(StripeBuffers *stripeBuffers, uint32 newChunkIndex);
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
uint32 chunkRowCount,
uint32 columnCount);
@ -174,13 +184,16 @@ uint64
ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *columnNulls)
{
uint32 columnIndex = 0;
Size totalRowSize = 0;
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
uint32 columnCount = writeState->tupleDescriptor->natts;
ColumnarOptions *options = &writeState->options;
const uint32 chunkRowCount = options->chunkRowCount;
const uint32 maxChunkCount = (options->stripeRowCount / chunkRowCount) + 1;
ChunkData *chunkData = writeState->chunkData;
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
bool shouldSerializeEarly = false;
if (stripeBuffers == NULL)
{
@ -209,10 +222,87 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
{
chunkData->valueBufferArray[columnIndex] = makeStringInfo();
}
writeState->currentChunkBytes = 0;
writeState->currentChunkIndex = 0;
writeState->currentChunkRowIndex = 0;
/* Ensure maxChunkSize is set with a reasonable default */
Assert(options->maxChunkSize >= CHUNK_GROUP_SIZE_MINIMUM &&
options->maxChunkSize <= CHUNK_GROUP_SIZE_MAXIMUM);
}
uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount;
uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;
uint32 chunkIndex = writeState->currentChunkIndex;
uint32 chunkRowIndex = writeState->currentChunkRowIndex;
/*
* Calculate total serialized current row size without actually serializing.
* This uses the same logic as SerializeSingleDatum but only computes sizes.
*/
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
if (!columnNulls[columnIndex])
{
Form_pg_attribute attributeForm =
TupleDescAttr(writeState->tupleDescriptor, columnIndex);
int columnTypeLength = attributeForm->attlen;
char columnTypeAlign = attributeForm->attalign;
uint32 datumLength = att_addlength_datum(0, columnTypeLength, columnValues[columnIndex]);
uint32 datumLengthAligned = att_align_nominal(datumLength, columnTypeAlign);
totalRowSize += (Size) datumLengthAligned;
}
}
/*
* Check if we need to serialize a chunk group earliar due to size limits.
* We also need to account to worst case copressed data size that can
* also exceed the limits.
*/
if (chunkRowIndex > 0)
{
int64 chunkGroupLimit = CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize);
int64 maxCompressedSize = GetMaxCompressedLength(writeState->currentChunkBytes,
writeState->options.compressionType);
shouldSerializeEarly = (maxCompressedSize + totalRowSize > chunkGroupLimit);
}
/*
* If adding the current row spills out from the defined chunk grupu size limit, we
* will then add the current row in a seperate chunk and will serialize
* all rows data before it.
*/
if (shouldSerializeEarly)
{
elog(DEBUG1, "Row size (%zu bytes) exceeds chunk group size limit (%zu bytes), "
"storing in a separate chunk group",
totalRowSize, CHUNK_GROUP_SIZE_MB_TO_BYTES(options->maxChunkSize));
/*
* Before putting row in a seperate chunk we have to allocate space
* for the new chunk if maxChunkCount reached.
*/
if (chunkIndex + 1 >= maxChunkCount)
{
ExpandStripeBuffersChunks(stripeBuffers, chunkIndex + 1);
ExpandStripeSkipListChunks(stripeSkipList, chunkIndex + 1);
}
/*
* Size limit reached, now serialize upto the last row.
* We make sure not to serialize the current row data and only upto
* the last row, so we use `chunkRowIndex` instead of `chunkRowIndex + 1`
* in order to skip current row. Current row will go in the next chunk.
*/
SerializeChunkData(writeState, chunkIndex, chunkRowIndex);
writeState->currentChunkBytes = 0;
/* Adjust the indices after deciding to start a new chunk */
chunkIndex = ++writeState->currentChunkIndex;
chunkRowIndex = writeState->currentChunkRowIndex = 0;
}
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
@ -249,17 +339,29 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
chunkSkipNode->rowCount++;
}
writeState->currentChunkBytes += totalRowSize;
stripeSkipList->chunkCount = chunkIndex + 1;
/* last row of the chunk is inserted serialize the chunk */
if (chunkRowIndex == chunkRowCount - 1)
{
SerializeChunkData(writeState, chunkIndex, chunkRowCount);
writeState->currentChunkBytes = 0;
writeState->currentChunkIndex++;
writeState->currentChunkRowIndex = 0;
}
uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber +
stripeBuffers->rowCount;
stripeBuffers->rowCount++;
/*
* don't increment when chunk row limit was reached and new chunk was
* created, writeState->currentChunkRowIndex should suppose to be remain `0`
* in this case.
*/
if (chunkRowIndex != chunkRowCount - 1) writeState->currentChunkRowIndex++;
if (stripeBuffers->rowCount >= options->stripeRowCount)
{
ColumnarFlushPendingWrites(writeState);
@ -318,6 +420,86 @@ ColumnarWritePerTupleContext(ColumnarWriteState *state)
return state->perTupleContext;
}
/*
* ExpandStripeBuffersChunks adds one more chunk to all columns in an existing
* StripeBuffers structure using repalloc.
*/
static StripeBuffers *
ExpandStripeBuffersChunks(StripeBuffers *stripeBuffers, uint32 newChunkIndex)
{
if (stripeBuffers == NULL || stripeBuffers->columnBuffersArray == NULL)
{
return NULL;
}
uint32 columnCount = stripeBuffers->columnCount;
/* Iterate through all columns and expand their chunk arrays */
for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
if (columnBuffers == NULL || columnBuffers->chunkBuffersArray == NULL)
{
continue;
}
/* Use repalloc to expand the chunkBuffersArray */
columnBuffers->chunkBuffersArray = (ColumnChunkBuffers **)
repalloc(columnBuffers->chunkBuffersArray,
(newChunkIndex + 1) * sizeof(ColumnChunkBuffers *));
/* Allocate and initialize the new chunk buffer */
columnBuffers->chunkBuffersArray[newChunkIndex] = palloc0(sizeof(ColumnChunkBuffers));
columnBuffers->chunkBuffersArray[newChunkIndex]->existsBuffer = NULL;
columnBuffers->chunkBuffersArray[newChunkIndex]->valueBuffer = NULL;
columnBuffers->chunkBuffersArray[newChunkIndex]->valueCompressionType = COMPRESSION_NONE;
}
return stripeBuffers;
}
/*
* ExpandStripeSkipListChunks adds one more chunk to all columns in an existing
* StripeSkipList structure using repalloc.
*/
static StripeSkipList *
ExpandStripeSkipListChunks(StripeSkipList *stripeSkipList, uint32 newChunkIndex)
{
if (stripeSkipList == NULL || stripeSkipList->chunkSkipNodeArray == NULL)
{
return NULL;
}
uint32 columnCount = stripeSkipList->columnCount;
/* Iterate through all columns and expand their chunk skip node arrays */
for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
if (stripeSkipList->chunkSkipNodeArray[columnIndex] == NULL)
{
continue;
}
/* Use repalloc to expand the chunk skip node array for this column */
stripeSkipList->chunkSkipNodeArray[columnIndex] = (ColumnChunkSkipNode *)
repalloc(stripeSkipList->chunkSkipNodeArray[columnIndex],
(newChunkIndex + 1) * sizeof(ColumnChunkSkipNode));
/* Initialize the new chunk skip node (equivalent to palloc0 behavior) */
memset(&stripeSkipList->chunkSkipNodeArray[columnIndex][newChunkIndex],
0, sizeof(ColumnChunkSkipNode));
}
/* Update the chunk count if the new chunk index is beyond current count */
if (newChunkIndex >= stripeSkipList->chunkCount)
{
stripeSkipList->chunkCount = newChunkIndex + 1;
}
return stripeSkipList;
}
/*
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
@ -404,9 +586,8 @@ FlushStripe(ColumnarWriteState *writeState)
TupleDesc tupleDescriptor = writeState->tupleDescriptor;
uint32 columnCount = tupleDescriptor->natts;
uint32 chunkCount = stripeSkipList->chunkCount;
uint32 chunkRowCount = writeState->options.chunkRowCount;
uint32 lastChunkIndex = stripeBuffers->rowCount / chunkRowCount;
uint32 lastChunkRowCount = stripeBuffers->rowCount % chunkRowCount;
uint32 lastChunkIndex = writeState->currentChunkIndex;
uint32 lastChunkRowCount = writeState->currentChunkRowIndex;
uint64 stripeSize = 0;
uint64 stripeRowCount = stripeBuffers->rowCount;

View File

@ -1,2 +1,139 @@
-- citus_columnar--13.2-1--14.0-1
-- bump version to 14.0-1
-- Support for new column `chunk_group_size_limit`
ALTER TABLE columnar_internal.options ADD COLUMN chunk_group_size_limit int NOT NULL;
-- After adding a new column `chunk_group_size_limit`
-- to table `columnar_internal.options` we have to redefine
-- all the related functions. In this case we have only table set
-- and reset funcitons to be redefined.
DROP FUNCTION IF EXISTS alter_columnar_table_set, alter_columnar_table_reset;
CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set(
table_name regclass,
chunk_group_row_limit int DEFAULT NULL,
stripe_row_limit int DEFAULT NULL,
chunk_group_size_limit int DEFAULT NULL,
compression name DEFAULT null,
compression_level int DEFAULT NULL)
RETURNS void
LANGUAGE plpgsql AS
$alter_columnar_table_set$
declare
noop BOOLEAN := true;
cmd TEXT := 'ALTER TABLE ' || table_name::text || ' SET (';
begin
if (chunk_group_row_limit is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_row_limit=' || chunk_group_row_limit;
noop := false;
end if;
if (stripe_row_limit is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.stripe_row_limit=' || stripe_row_limit;
noop := false;
end if;
if (chunk_group_size_limit is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_size_limit=' || chunk_group_size_limit;
noop := false;
end if;
if (compression is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression=' || compression;
noop := false;
end if;
if (compression_level is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression_level=' || compression_level;
noop := false;
end if;
cmd := cmd || ')';
if (not noop) then
execute cmd;
end if;
return;
end;
$alter_columnar_table_set$;
COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set(
table_name regclass,
chunk_group_row_limit int,
stripe_row_limit int,
chunk_group_size_limit int,
compression name,
compression_level int)
IS 'set one or more options on a columnar table, when set to NULL no change is made';
CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset(
table_name regclass,
chunk_group_row_limit bool DEFAULT false,
stripe_row_limit bool DEFAULT false,
chunk_group_size_limit bool DEFAULT false,
compression bool DEFAULT false,
compression_level bool DEFAULT false)
RETURNS void
LANGUAGE plpgsql AS
$alter_columnar_table_reset$
declare
noop BOOLEAN := true;
cmd TEXT := 'ALTER TABLE ' || table_name::text || ' RESET (';
begin
if (chunk_group_row_limit) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_row_limit';
noop := false;
end if;
if (stripe_row_limit) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.stripe_row_limit';
noop := false;
end if;
if (chunk_group_size_limit) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_size_limit';
noop := false;
end if;
if (compression) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression';
noop := false;
end if;
if (compression_level) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression_level';
noop := false;
end if;
cmd := cmd || ')';
if (not noop) then
execute cmd;
end if;
return;
end;
$alter_columnar_table_reset$;
COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset(
table_name regclass,
chunk_group_row_limit bool,
stripe_row_limit bool,
chunk_group_size_limit bool,
compression bool,
compression_level bool)
IS 'reset on or more options on a columnar table to the system defaults';
-- Redefine view for columnar options this time adding the new column `chunk_group_size_limit`
DROP VIEW IF EXISTS columnar.options;
CREATE VIEW columnar.options WITH (security_barrier) AS
SELECT regclass AS relation, chunk_group_row_limit, chunk_group_size_limit,
stripe_row_limit, compression, compression_level
FROM columnar_internal.options o, pg_class c
WHERE o.regclass = c.oid
AND pg_has_role(c.relowner, 'USAGE');
COMMENT ON VIEW columnar.options
IS 'Columnar options for tables on which the current user has ownership privileges.';
GRANT SELECT ON columnar.options TO PUBLIC;

View File

@ -1,2 +1,120 @@
-- citus_columnar--14.0-1--13.2-1
-- downgrade version to 13.2-1
-- Remove column `chunk_group_size_limit`
ALTER TABLE columnar_internal.options DROP COLUMN chunk_group_size_limit;
-- Remove column `chunk_group_size_limit` by redefining the functions & views
DROP VIEW IF EXISTS columnar.options;
DROP FUNCTION IF EXISTS alter_columnar_table_set, alter_columnar_table_reset;
-- Redefine
CREATE VIEW columnar.options WITH (security_barrier) AS
SELECT regclass AS relation, chunk_group_row_limit,
stripe_row_limit, compression, compression_level
FROM columnar_internal.options o, pg_class c
WHERE o.regclass = c.oid
AND pg_has_role(c.relowner, 'USAGE');
COMMENT ON VIEW columnar.options
IS 'Columnar options for tables on which the current user has ownership privileges.';
GRANT SELECT ON columnar.options TO PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set(
table_name regclass,
chunk_group_row_limit int DEFAULT NULL,
stripe_row_limit int DEFAULT NULL,
compression name DEFAULT null,
compression_level int DEFAULT NULL)
RETURNS void
LANGUAGE plpgsql AS
$alter_columnar_table_set$
declare
noop BOOLEAN := true;
cmd TEXT := 'ALTER TABLE ' || table_name::text || ' SET (';
begin
if (chunk_group_row_limit is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_row_limit=' || chunk_group_row_limit;
noop := false;
end if;
if (stripe_row_limit is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.stripe_row_limit=' || stripe_row_limit;
noop := false;
end if;
if (compression is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression=' || compression;
noop := false;
end if;
if (compression_level is not null) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression_level=' || compression_level;
noop := false;
end if;
cmd := cmd || ')';
if (not noop) then
execute cmd;
end if;
return;
end;
$alter_columnar_table_set$;
COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set(
table_name regclass,
chunk_group_row_limit int,
stripe_row_limit int,
compression name,
compression_level int)
IS 'set one or more options on a columnar table, when set to NULL no change is made';
CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset(
table_name regclass,
chunk_group_row_limit bool DEFAULT false,
stripe_row_limit bool DEFAULT false,
compression bool DEFAULT false,
compression_level bool DEFAULT false)
RETURNS void
LANGUAGE plpgsql AS
$alter_columnar_table_reset$
declare
noop BOOLEAN := true;
cmd TEXT := 'ALTER TABLE ' || table_name::text || ' RESET (';
begin
if (chunk_group_row_limit) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.chunk_group_row_limit';
noop := false;
end if;
if (stripe_row_limit) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.stripe_row_limit';
noop := false;
end if;
if (compression) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression';
noop := false;
end if;
if (compression_level) then
if (not noop) then cmd := cmd || ', '; end if;
cmd := cmd || 'columnar.compression_level';
noop := false;
end if;
cmd := cmd || ')';
if (not noop) then
execute cmd;
end if;
return;
end;
$alter_columnar_table_reset$;
COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset(
table_name regclass,
chunk_group_row_limit bool,
stripe_row_limit bool,
compression bool,
compression_level bool)
IS 'reset on or more options on a columnar table to the system defaults';

View File

@ -50,6 +50,8 @@
#define CHUNK_ROW_COUNT_MAXIMUM 100000
#define COMPRESSION_LEVEL_MIN 1
#define COMPRESSION_LEVEL_MAX 19
#define CHUNK_GROUP_SIZE_MINIMUM 1
#define CHUNK_GROUP_SIZE_MAXIMUM 1024 /* going beyond 1024 cause enlargeStringInfo() go out of memory */
/* Columnar file signature */
#define COLUMNAR_VERSION_MAJOR 2
@ -60,6 +62,7 @@
#define COLUMNAR_POSTSCRIPT_SIZE_LENGTH 1
#define COLUMNAR_POSTSCRIPT_SIZE_MAX 256
#define COLUMNAR_BYTES_PER_PAGE (BLCKSZ - SizeOfPageHeaderData)
#define CHUNK_GROUP_SIZE_MB_TO_BYTES(mb) ((Size)((mb) * 1024UL * 1024UL))
/*global variables for citus_columnar fake version Y */
#define CITUS_COLUMNAR_INTERNAL_VERSION "11.1-0"
@ -81,6 +84,7 @@ typedef struct ColumnarOptions
{
uint64 stripeRowCount;
uint32 chunkRowCount;
uint32 maxChunkSize;
CompressionType compressionType;
int compressionLevel;
} ColumnarOptions;
@ -229,6 +233,7 @@ typedef struct ColumnarWriteState ColumnarWriteState;
extern int columnar_compression;
extern int columnar_stripe_row_limit;
extern int columnar_chunk_group_row_limit;
extern int columnar_chunk_group_size_limit;
extern int columnar_compression_level;
/* called when the user changes options on the given relation */

View File

@ -30,5 +30,5 @@ extern bool CompressBuffer(StringInfo inputBuffer,
int compressionLevel);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType,
uint64 decompressedSize);
extern int GetMaxCompressedLength(int size, CompressionType compressionType);
#endif /* COLUMNAR_COMPRESSION_H */

View File

@ -33,3 +33,4 @@ test: columnar_recursive
test: columnar_transactions
test: columnar_matview
test: columnar_memory
test: columnar_chunk_sizes

View File

@ -0,0 +1,163 @@
CREATE SCHEMA columnar_chunk_test;
SET search_path TO columnar_chunk_test;
SET columnar.compression TO 'none';
-- set to debug1 to see how many new chunks has been created during
-- chunk_group_size_limit overflow
SET client_min_messages TO debug1;
--
-- ISSUE_6420
--
-- Issue: Automatically allocate a new chunk group instead of throwing error due to buffer size limits
-- Link: https://github.com/citusdata/citus/issues/6420
--
-- Insert rows that exceeds the chunk group size limit.
-- Adding 600 rows each with the size of 2MB will eventually exceeds the
-- limit of 1GB for enlargeStringInfo() but this should not fail.
-- Also setting chunk_group_size_limit to will exceed the max chunk groups limit 5000/1000 = 5, new
-- chunkgroup should be allocated automatically
CREATE TABLE test_oversized_row (
id INTEGER,
huge_text TEXT
) USING columnar WITH (
columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 1500,
columnar.chunk_group_size_limit = 128
);
-- test with chunk & stripe row limit reached
INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text
FROM generate_series(1, 1600) AS gs;
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Row size (1048584 bytes) exceeds chunk group size limit (134217728 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 1500
DEBUG: Flushing Stripe of size 100
SET client_min_messages TO warning;
-- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | chunk_group_num | row_count
---------------------------------------------------------------------
test_oversized_row | 10000000261 | 1 | 0 | 127
test_oversized_row | 10000000261 | 1 | 1 | 127
test_oversized_row | 10000000261 | 1 | 2 | 127
test_oversized_row | 10000000261 | 1 | 3 | 127
test_oversized_row | 10000000261 | 1 | 4 | 127
test_oversized_row | 10000000261 | 1 | 5 | 127
test_oversized_row | 10000000261 | 1 | 6 | 127
test_oversized_row | 10000000261 | 1 | 7 | 127
test_oversized_row | 10000000261 | 1 | 8 | 127
test_oversized_row | 10000000261 | 1 | 9 | 127
test_oversized_row | 10000000261 | 1 | 10 | 127
test_oversized_row | 10000000261 | 1 | 11 | 103
test_oversized_row | 10000000261 | 2 | 0 | 100
(13 rows)
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
---------------------------------------------------------------------
test_oversized_row | 10000000261 | 1 | 16336 | 1572876378 | 2 | 1000 | 1500 | 12 | 1
test_oversized_row | 10000000261 | 2 | 1572895424 | 104858426 | 2 | 1000 | 100 | 1 | 1501
(2 rows)
SELECT COUNT(*) FROM test_oversized_row;
count
---------------------------------------------------------------------
1600
(1 row)
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
id | length
---------------------------------------------------------------------
1 | 1048576
2 | 1048576
3 | 1048576
4 | 1048576
5 | 1048576
6 | 1048576
7 | 1048576
8 | 1048576
9 | 1048576
10 | 1048576
(10 rows)
\dt+ test_oversized_row
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 1605 MB |
(1 row)
-- test edge case setting chunk_group_size_limit = 1024
DROP TABLE test_oversized_row;
SET client_min_messages TO debug1;
SET columnar.compression TO default;
CREATE TABLE test_oversized_row (
id INTEGER,
huge_text TEXT
) USING columnar WITH (
columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 5000,
columnar.chunk_group_size_limit = 1024
);
INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
FROM generate_series(1, 600) AS gs;
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600
-- test VACUUM FULL
VACUUM FULL test_oversized_row;
DEBUG: Row size (2097160 bytes) exceeds chunk group size limit (1073741824 bytes), storing in a separate chunk group
DEBUG: Flushing Stripe of size 600
SET client_min_messages TO warning;
-- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | chunk_group_num | row_count
---------------------------------------------------------------------
test_oversized_row | 10000000263 | 1 | 0 | 510
test_oversized_row | 10000000263 | 1 | 1 | 90
(2 rows)
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
relation | storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
---------------------------------------------------------------------
test_oversized_row | 10000000263 | 1 | 16336 | 49278 | 2 | 1000 | 600 | 2 | 1
(1 row)
SELECT COUNT(*) FROM test_oversized_row;
count
---------------------------------------------------------------------
600
(1 row)
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
id | length
---------------------------------------------------------------------
1 | 2097152
2 | 2097152
3 | 2097152
4 | 2097152
5 | 2097152
6 | 2097152
7 | 2097152
8 | 2097152
9 | 2097152
10 | 2097152
(10 rows)
\dt+ test_oversized_row
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
columnar_chunk_test | test_oversized_row | table | postgres | permanent | 72 kB |
(1 row)
DROP TABLE test_oversized_row;
DROP SCHEMA columnar_chunk_test CASCADE;

View File

@ -8,9 +8,9 @@ ALTER TABLE t_compressed SET (columnar.compression = pglz);
ALTER TABLE t_compressed SET (columnar.stripe_row_limit = 2000);
ALTER TABLE t_compressed SET (columnar.chunk_group_row_limit = 1000);
SELECT * FROM columnar.options WHERE relation = 't_compressed'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
t_compressed | 1000 | 2000 | pglz | 3
t_compressed | 1000 | 256 | 2000 | pglz | 3
(1 row)
-- select

View File

@ -26,27 +26,27 @@ SELECT * FROM t_view a ORDER BY a;
-- show columnar options for materialized view
SELECT * FROM columnar.options
WHERE relation = 't_view'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
t_view | 10000 | 150000 | none | 3
t_view | 10000 | 256 | 150000 | none | 3
(1 row)
-- show we can set options on a materialized view
ALTER TABLE t_view SET (columnar.compression = pglz);
SELECT * FROM columnar.options
WHERE relation = 't_view'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
t_view | 10000 | 150000 | pglz | 3
t_view | 10000 | 256 | 150000 | pglz | 3
(1 row)
REFRESH MATERIALIZED VIEW t_view;
-- verify options have not been changed
SELECT * FROM columnar.options
WHERE relation = 't_view'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
t_view | 10000 | 150000 | pglz | 3
t_view | 10000 | 256 | 150000 | pglz | 3
(1 row)
SELECT * FROM t_view a ORDER BY a;

View File

@ -88,11 +88,11 @@ ERROR: must be owner of table no_access
select alter_columnar_table_reset('no_access', chunk_group_row_limit => true);
ERROR: must be owner of table no_access
CONTEXT: SQL statement "ALTER TABLE no_access RESET (columnar.chunk_group_row_limit)"
PL/pgSQL function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) line XX at EXECUTE
PL/pgSQL function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean,boolean) line XX at EXECUTE
select alter_columnar_table_set('no_access', chunk_group_row_limit => 1111);
ERROR: must be owner of table no_access
CONTEXT: SQL statement "ALTER TABLE no_access SET (columnar.chunk_group_row_limit=1111)"
PL/pgSQL function alter_columnar_table_set(regclass,integer,integer,name,integer) line XX at EXECUTE
PL/pgSQL function alter_columnar_table_set(regclass,integer,integer,integer,name,integer) line XX at EXECUTE
\c - :current_user
-- should see tuples from both columnar_permissions and no_access
select relation, chunk_group_row_limit, stripe_row_limit, compression, compression_level

View File

@ -1,7 +1,7 @@
CREATE TABLE alter_am(i int);
INSERT INTO alter_am SELECT generate_series(1,1000000);
SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
(0 rows)
@ -15,9 +15,9 @@ ALTER TABLE alter_am
SET ACCESS METHOD columnar,
SET (columnar.compression = pglz, fillfactor = 20);
SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
alter_am | 10000 | 150000 | pglz | 3
alter_am | 10000 | 256 | 150000 | pglz | 3
(1 row)
SELECT SUM(i) FROM alter_am;
@ -29,7 +29,7 @@ SELECT SUM(i) FROM alter_am;
ALTER TABLE alter_am SET ACCESS METHOD heap;
-- columnar options should be gone
SELECT * FROM columnar.options WHERE relation = 'alter_am'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
(0 rows)

View File

@ -6,9 +6,9 @@ INSERT INTO table_options SELECT generate_series(1,100);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 10000 | 150000 | none | 3
table_options | 10000 | 256 | 150000 | none | 3
(1 row)
-- test changing the compression
@ -16,9 +16,9 @@ ALTER TABLE table_options SET (columnar.compression = pglz);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 10000 | 150000 | pglz | 3
table_options | 10000 | 256 | 150000 | pglz | 3
(1 row)
-- test changing the compression level
@ -26,9 +26,9 @@ ALTER TABLE table_options SET (columnar.compression_level = 5);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 10000 | 150000 | pglz | 5
table_options | 10000 | 256 | 150000 | pglz | 5
(1 row)
-- test changing the chunk_group_row_limit
@ -36,9 +36,9 @@ ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 2000);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 2000 | 150000 | pglz | 5
table_options | 2000 | 256 | 150000 | pglz | 5
(1 row)
-- test changing the chunk_group_row_limit
@ -46,9 +46,19 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 4000);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 2000 | 4000 | pglz | 5
table_options | 2000 | 256 | 4000 | pglz | 5
(1 row)
-- test changing the chunk_group_size_limit
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 512);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 2000 | 512 | 4000 | pglz | 5
(1 row)
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
@ -56,23 +66,24 @@ VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 2000 | 4000 | pglz | 5
table_options | 2000 | 512 | 4000 | pglz | 5
(1 row)
-- set all settings at the same time
ALTER TABLE table_options SET
(columnar.stripe_row_limit = 8000,
columnar.chunk_group_row_limit = 4000,
columnar.chunk_group_size_limit = 128,
columnar.compression = none,
columnar.compression_level = 7);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
-- make sure table options are not changed when VACUUM a table
@ -80,9 +91,9 @@ VACUUM table_options;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
-- make sure table options are not changed when VACUUM FULL a table
@ -90,9 +101,9 @@ VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
-- make sure table options are not changed when truncating a table
@ -100,94 +111,106 @@ TRUNCATE table_options;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
ALTER TABLE table_options ALTER COLUMN a TYPE bigint;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
-- reset settings one by one to the version of the GUC's
SET columnar.chunk_group_row_limit TO 1000;
SET columnar.stripe_row_limit TO 10000;
SET columnar.chunk_group_size_limit TO 640;
SET columnar.compression TO 'pglz';
SET columnar.compression_level TO 11;
-- verify setting the GUC's didn't change the settings
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 4000 | 8000 | none | 7
table_options | 4000 | 128 | 8000 | none | 7
(1 row)
ALTER TABLE table_options RESET (columnar.chunk_group_row_limit);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 8000 | none | 7
table_options | 1000 | 128 | 8000 | none | 7
(1 row)
ALTER TABLE table_options RESET (columnar.stripe_row_limit);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 10000 | none | 7
table_options | 1000 | 128 | 10000 | none | 7
(1 row)
ALTER TABLE table_options RESET (columnar.chunk_group_size_limit);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 640 | 10000 | none | 7
(1 row)
ALTER TABLE table_options RESET (columnar.compression);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 10000 | pglz | 7
table_options | 1000 | 640 | 10000 | pglz | 7
(1 row)
ALTER TABLE table_options RESET (columnar.compression_level);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 10000 | pglz | 11
table_options | 1000 | 640 | 10000 | pglz | 11
(1 row)
-- verify resetting all settings at once work
SET columnar.chunk_group_row_limit TO 10000;
SET columnar.stripe_row_limit TO 100000;
SET columnar.chunk_group_size_limit TO 768;
SET columnar.compression TO 'none';
SET columnar.compression_level TO 13;
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 1000 | 10000 | pglz | 11
table_options | 1000 | 640 | 10000 | pglz | 11
(1 row)
ALTER TABLE table_options RESET
(columnar.chunk_group_row_limit,
columnar.stripe_row_limit,
columnar.chunk_group_size_limit,
columnar.compression,
columnar.compression_level);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 10000 | 100000 | none | 13
table_options | 10000 | 768 | 100000 | none | 13
(1 row)
-- verify edge cases
@ -234,6 +257,12 @@ HINT: chunk group row count limit must be between 1000 and 100000
ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 0);
ERROR: chunk group row count limit out of range
HINT: chunk group row count limit must be between 1000 and 100000
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 1025);
ERROR: chunk group size limit out of range
HINT: chunk group size limit must be between 1 and 1024
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 0);
ERROR: chunk group size limit out of range
HINT: chunk group size limit must be between 1 and 1024
INSERT INTO table_options VALUES (1);
-- multiple SET/RESET clauses
ALTER TABLE table_options
@ -241,9 +270,9 @@ ALTER TABLE table_options
SET (columnar.compression_level = 6);
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 10000 | 100000 | pglz | 6
table_options | 10000 | 768 | 100000 | pglz | 6
(1 row)
ALTER TABLE table_options
@ -252,9 +281,9 @@ ALTER TABLE table_options
SET (columnar.chunk_group_row_limit = 5555);
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 5555 | 100000 | pglz | 6
table_options | 5555 | 768 | 100000 | pglz | 6
(1 row)
-- a no-op; shouldn't throw an error
@ -272,9 +301,9 @@ SELECT alter_columnar_table_reset('table_options', compression => true);
(1 row)
SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 5555 | 100000 | none | 6
table_options | 5555 | 768 | 100000 | none | 6
(1 row)
SELECT alter_columnar_table_set('table_options', compression_level => 1);
@ -284,9 +313,9 @@ SELECT alter_columnar_table_set('table_options', compression_level => 1);
(1 row)
SELECT * FROM columnar.options WHERE relation = 'table_options'::regclass;
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
table_options | 5555 | 100000 | none | 1
table_options | 5555 | 768 | 100000 | none | 1
(1 row)
-- error: set columnar options on heap tables
@ -303,7 +332,7 @@ DROP TABLE heap_options;
DROP TABLE table_options;
-- we expect no entries in çstore.options for anything not found int pg_class
SELECT * FROM columnar.options o WHERE o.relation NOT IN (SELECT oid FROM pg_class);
relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level
relation | chunk_group_row_limit | chunk_group_size_limit | stripe_row_limit | compression | compression_level
---------------------------------------------------------------------
(0 rows)

View File

@ -0,0 +1,78 @@
CREATE SCHEMA columnar_chunk_test;
SET search_path TO columnar_chunk_test;
SET columnar.compression TO 'none';
-- set to debug1 to see how many new chunks has been created during
-- chunk_group_size_limit overflow
SET client_min_messages TO debug1;
--
-- ISSUE_6420
--
-- Issue: Automatically allocate a new chunk group instead of throwing error due to buffer size limits
-- Link: https://github.com/citusdata/citus/issues/6420
--
-- Insert rows that exceeds the chunk group size limit.
-- Adding 600 rows each with the size of 2MB will eventually exceeds the
-- limit of 1GB for enlargeStringInfo() but this should not fail.
-- Also setting chunk_group_size_limit to will exceed the max chunk groups limit 5000/1000 = 5, new
-- chunkgroup should be allocated automatically
CREATE TABLE test_oversized_row (
id INTEGER,
huge_text TEXT
) USING columnar WITH (
columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 1500,
columnar.chunk_group_size_limit = 128
);
-- test with chunk & stripe row limit reached
INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 1*1024*1024) -- 1 MB text
FROM generate_series(1, 1600) AS gs;
SET client_min_messages TO warning;
-- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
SELECT COUNT(*) FROM test_oversized_row;
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
\dt+ test_oversized_row
-- test edge case setting chunk_group_size_limit = 1024
DROP TABLE test_oversized_row;
SET client_min_messages TO debug1;
SET columnar.compression TO default;
CREATE TABLE test_oversized_row (
id INTEGER,
huge_text TEXT
) USING columnar WITH (
columnar.chunk_group_row_limit = 1000,
columnar.stripe_row_limit = 5000,
columnar.chunk_group_size_limit = 1024
);
INSERT INTO test_oversized_row
SELECT gs, repeat('Y', 2*1024*1024) -- 2 MB text
FROM generate_series(1, 600) AS gs;
-- test VACUUM FULL
VACUUM FULL test_oversized_row;
SET client_min_messages TO warning;
-- try verifying the data integrity
SELECT * FROM columnar.chunk_group WHERE relation = 'test_oversized_row'::regclass;
SELECT * FROM columnar.stripe WHERE relation = 'test_oversized_row'::regclass;
SELECT COUNT(*) FROM test_oversized_row;
SELECT ID, LENGTH(huge_text) FROM test_oversized_row ORDER BY id LIMIT 10;
\dt+ test_oversized_row
DROP TABLE test_oversized_row;
DROP SCHEMA columnar_chunk_test CASCADE;

View File

@ -37,6 +37,13 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 4000);
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
-- test changing the chunk_group_size_limit
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 512);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
VACUUM FULL table_options;
@ -48,6 +55,7 @@ WHERE relation = 'table_options'::regclass;
ALTER TABLE table_options SET
(columnar.stripe_row_limit = 8000,
columnar.chunk_group_row_limit = 4000,
columnar.chunk_group_size_limit = 128,
columnar.compression = none,
columnar.compression_level = 7);
@ -81,6 +89,7 @@ WHERE relation = 'table_options'::regclass;
-- reset settings one by one to the version of the GUC's
SET columnar.chunk_group_row_limit TO 1000;
SET columnar.stripe_row_limit TO 10000;
SET columnar.chunk_group_size_limit TO 640;
SET columnar.compression TO 'pglz';
SET columnar.compression_level TO 11;
@ -100,6 +109,12 @@ ALTER TABLE table_options RESET (columnar.stripe_row_limit);
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
ALTER TABLE table_options RESET (columnar.chunk_group_size_limit);
-- show table_options settings
SELECT * FROM columnar.options
WHERE relation = 'table_options'::regclass;
ALTER TABLE table_options RESET (columnar.compression);
-- show table_options settings
@ -115,6 +130,7 @@ WHERE relation = 'table_options'::regclass;
-- verify resetting all settings at once work
SET columnar.chunk_group_row_limit TO 10000;
SET columnar.stripe_row_limit TO 100000;
SET columnar.chunk_group_size_limit TO 768;
SET columnar.compression TO 'none';
SET columnar.compression_level TO 13;
@ -125,6 +141,7 @@ WHERE relation = 'table_options'::regclass;
ALTER TABLE table_options RESET
(columnar.chunk_group_row_limit,
columnar.stripe_row_limit,
columnar.chunk_group_size_limit,
columnar.compression,
columnar.compression_level);
@ -160,6 +177,8 @@ ALTER TABLE table_options SET (columnar.stripe_row_limit = 10000001);
ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 999);
ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 100001);
ALTER TABLE table_options SET (columnar.chunk_group_row_limit = 0);
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 1025);
ALTER TABLE table_options SET (columnar.chunk_group_size_limit = 0);
INSERT INTO table_options VALUES (1);
-- multiple SET/RESET clauses