mirror of https://github.com/citusdata/citus.git
270 lines
7.1 KiB
C
270 lines
7.1 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* cstore_compression.c
|
|
*
|
|
* This file contains compression/decompression functions definitions
|
|
* used in cstore_fdw.
|
|
*
|
|
* Copyright (c) 2016, Citus Data, Inc.
|
|
*
|
|
* $Id$
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "citus_version.h"
|
|
#include "columnar/cstore.h"
|
|
#include "common/pg_lzcompress.h"
|
|
|
|
#if HAVE_LIBLZ4
|
|
#include <lz4.h>
|
|
#endif
|
|
|
|
#if HAVE_LIBZSTD
|
|
#include <zstd.h>
|
|
#endif
|
|
|
|
/*
|
|
* The information at the start of the compressed data. This decription is taken
|
|
* from pg_lzcompress in pre-9.5 version of PostgreSQL.
|
|
*/
|
|
typedef struct CStoreCompressHeader
|
|
{
|
|
int32 vl_len_; /* varlena header (do not touch directly!) */
|
|
int32 rawsize;
|
|
} CStoreCompressHeader;
|
|
|
|
/*
|
|
* Utilities for manipulation of header information for compressed data
|
|
*/
|
|
|
|
#define CSTORE_COMPRESS_HDRSZ ((int32) sizeof(CStoreCompressHeader))
|
|
#define CSTORE_COMPRESS_RAWSIZE(ptr) (((CStoreCompressHeader *) (ptr))->rawsize)
|
|
#define CSTORE_COMPRESS_RAWDATA(ptr) (((char *) (ptr)) + CSTORE_COMPRESS_HDRSZ)
|
|
#define CSTORE_COMPRESS_SET_RAWSIZE(ptr, len) (((CStoreCompressHeader *) (ptr))->rawsize = \
|
|
(len))
|
|
|
|
|
|
/*
|
|
* CompressBuffer compresses the given buffer with the given compression type
|
|
* outputBuffer enlarged to contain compressed data. The function returns true
|
|
* if compression is done, returns false if compression is not done.
|
|
* outputBuffer is valid only if the function returns true.
|
|
*/
|
|
bool
|
|
CompressBuffer(StringInfo inputBuffer,
|
|
StringInfo outputBuffer,
|
|
CompressionType compressionType,
|
|
int compressionLevel)
|
|
{
|
|
switch (compressionType)
|
|
{
|
|
#if HAVE_LIBLZ4
|
|
case COMPRESSION_LZ4:
|
|
{
|
|
int maximumLength = LZ4_compressBound(inputBuffer->len);
|
|
|
|
resetStringInfo(outputBuffer);
|
|
enlargeStringInfo(outputBuffer, maximumLength);
|
|
|
|
int compressedSize = LZ4_compress_default(inputBuffer->data,
|
|
outputBuffer->data,
|
|
inputBuffer->len, maximumLength);
|
|
if (compressedSize <= 0)
|
|
{
|
|
elog(DEBUG1,
|
|
"failure in LZ4_compress_default, input size=%d, output size=%d",
|
|
inputBuffer->len, maximumLength);
|
|
return false;
|
|
}
|
|
|
|
elog(DEBUG1, "compressed %d bytes to %d bytes", inputBuffer->len,
|
|
compressedSize);
|
|
|
|
outputBuffer->len = compressedSize;
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
#if HAVE_LIBZSTD
|
|
case COMPRESSION_ZSTD:
|
|
{
|
|
int maximumLength = ZSTD_compressBound(inputBuffer->len);
|
|
|
|
resetStringInfo(outputBuffer);
|
|
enlargeStringInfo(outputBuffer, maximumLength);
|
|
|
|
size_t compressedSize = ZSTD_compress(outputBuffer->data,
|
|
outputBuffer->maxlen,
|
|
inputBuffer->data,
|
|
inputBuffer->len,
|
|
compressionLevel);
|
|
|
|
if (ZSTD_isError(compressedSize))
|
|
{
|
|
ereport(WARNING, (errmsg("zstd compression failed"),
|
|
(errdetail("%s", ZSTD_getErrorName(compressedSize)))));
|
|
return false;
|
|
}
|
|
|
|
outputBuffer->len = compressedSize;
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
case COMPRESSION_PG_LZ:
|
|
{
|
|
uint64 maximumLength = PGLZ_MAX_OUTPUT(inputBuffer->len) +
|
|
CSTORE_COMPRESS_HDRSZ;
|
|
bool compressionResult = false;
|
|
|
|
resetStringInfo(outputBuffer);
|
|
enlargeStringInfo(outputBuffer, maximumLength);
|
|
|
|
int32 compressedByteCount = pglz_compress((const char *) inputBuffer->data,
|
|
inputBuffer->len,
|
|
CSTORE_COMPRESS_RAWDATA(
|
|
outputBuffer->data),
|
|
PGLZ_strategy_always);
|
|
if (compressedByteCount >= 0)
|
|
{
|
|
CSTORE_COMPRESS_SET_RAWSIZE(outputBuffer->data, inputBuffer->len);
|
|
SET_VARSIZE_COMPRESSED(outputBuffer->data,
|
|
compressedByteCount + CSTORE_COMPRESS_HDRSZ);
|
|
compressionResult = true;
|
|
}
|
|
|
|
if (compressionResult)
|
|
{
|
|
outputBuffer->len = VARSIZE(outputBuffer->data);
|
|
}
|
|
|
|
return compressionResult;
|
|
}
|
|
|
|
default:
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* DecompressBuffer decompresses the given buffer with the given compression
|
|
* type. This function returns the buffer as-is when no compression is applied.
|
|
*/
|
|
StringInfo
|
|
DecompressBuffer(StringInfo buffer,
|
|
CompressionType compressionType,
|
|
uint64 decompressedSize)
|
|
{
|
|
switch (compressionType)
|
|
{
|
|
case COMPRESSION_NONE:
|
|
{
|
|
return buffer;
|
|
}
|
|
|
|
#if HAVE_LIBLZ4
|
|
case COMPRESSION_LZ4:
|
|
{
|
|
StringInfo decompressedBuffer = makeStringInfo();
|
|
enlargeStringInfo(decompressedBuffer, decompressedSize);
|
|
|
|
int lz4DecompressSize = LZ4_decompress_safe(buffer->data,
|
|
decompressedBuffer->data,
|
|
buffer->len,
|
|
decompressedSize);
|
|
|
|
if (lz4DecompressSize != decompressedSize)
|
|
{
|
|
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
|
errdetail("Expected %lu bytes, but received %d bytes",
|
|
decompressedSize, lz4DecompressSize)));
|
|
}
|
|
|
|
decompressedBuffer->len = decompressedSize;
|
|
|
|
return decompressedBuffer;
|
|
}
|
|
#endif
|
|
|
|
#if HAVE_LIBZSTD
|
|
case COMPRESSION_ZSTD:
|
|
{
|
|
StringInfo decompressedBuffer = makeStringInfo();
|
|
enlargeStringInfo(decompressedBuffer, decompressedSize);
|
|
|
|
size_t zstdDecompressSize = ZSTD_decompress(decompressedBuffer->data,
|
|
decompressedSize,
|
|
buffer->data,
|
|
buffer->len);
|
|
if (ZSTD_isError(zstdDecompressSize))
|
|
{
|
|
ereport(ERROR, (errmsg("zstd decompression failed"),
|
|
(errdetail("%s", ZSTD_getErrorName(
|
|
zstdDecompressSize)))));
|
|
}
|
|
|
|
if (zstdDecompressSize != decompressedSize)
|
|
{
|
|
ereport(ERROR, (errmsg("unexpected decompressed size"),
|
|
errdetail("Expected %ld, received %ld", decompressedSize,
|
|
zstdDecompressSize)));
|
|
}
|
|
|
|
decompressedBuffer->len = decompressedSize;
|
|
|
|
return decompressedBuffer;
|
|
}
|
|
#endif
|
|
|
|
case COMPRESSION_PG_LZ:
|
|
{
|
|
StringInfo decompressedBuffer = NULL;
|
|
uint32 compressedDataSize = VARSIZE(buffer->data) - CSTORE_COMPRESS_HDRSZ;
|
|
uint32 decompressedDataSize = CSTORE_COMPRESS_RAWSIZE(buffer->data);
|
|
int32 decompressedByteCount = 0;
|
|
|
|
if (compressedDataSize + CSTORE_COMPRESS_HDRSZ != buffer->len)
|
|
{
|
|
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
|
errdetail("Expected %u bytes, but received %u bytes",
|
|
compressedDataSize, buffer->len)));
|
|
}
|
|
|
|
char *decompressedData = palloc0(decompressedDataSize);
|
|
|
|
#if PG_VERSION_NUM >= 120000
|
|
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
|
compressedDataSize, decompressedData,
|
|
decompressedDataSize, true);
|
|
#else
|
|
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
|
compressedDataSize, decompressedData,
|
|
decompressedDataSize);
|
|
#endif
|
|
|
|
if (decompressedByteCount < 0)
|
|
{
|
|
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
|
errdetail("compressed data is corrupted")));
|
|
}
|
|
|
|
decompressedBuffer = palloc0(sizeof(StringInfoData));
|
|
decompressedBuffer->data = decompressedData;
|
|
decompressedBuffer->len = decompressedDataSize;
|
|
decompressedBuffer->maxlen = decompressedDataSize;
|
|
|
|
return decompressedBuffer;
|
|
}
|
|
|
|
default:
|
|
{
|
|
ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType)));
|
|
}
|
|
}
|
|
}
|