mirror of https://github.com/citusdata/citus.git
Columnar: Support LZ4 compression
parent
260a02180b
commit
3f81ee26fd
|
@ -630,6 +630,7 @@ CITUS_LDFLAGS
|
|||
CITUS_CPPFLAGS
|
||||
CITUS_CFLAGS
|
||||
GIT_BIN
|
||||
with_lz4
|
||||
EGREP
|
||||
GREP
|
||||
CPP
|
||||
|
@ -665,6 +666,7 @@ infodir
|
|||
docdir
|
||||
oldincludedir
|
||||
includedir
|
||||
runstatedir
|
||||
localstatedir
|
||||
sharedstatedir
|
||||
sysconfdir
|
||||
|
@ -689,6 +691,7 @@ ac_user_opts='
|
|||
enable_option_checking
|
||||
with_extra_version
|
||||
enable_coverage
|
||||
with_lz4
|
||||
with_libcurl
|
||||
with_reports_hostname
|
||||
'
|
||||
|
@ -741,6 +744,7 @@ datadir='${datarootdir}'
|
|||
sysconfdir='${prefix}/etc'
|
||||
sharedstatedir='${prefix}/com'
|
||||
localstatedir='${prefix}/var'
|
||||
runstatedir='${localstatedir}/run'
|
||||
includedir='${prefix}/include'
|
||||
oldincludedir='/usr/include'
|
||||
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
|
||||
|
@ -993,6 +997,15 @@ do
|
|||
| -silent | --silent | --silen | --sile | --sil)
|
||||
silent=yes ;;
|
||||
|
||||
-runstatedir | --runstatedir | --runstatedi | --runstated \
|
||||
| --runstate | --runstat | --runsta | --runst | --runs \
|
||||
| --run | --ru | --r)
|
||||
ac_prev=runstatedir ;;
|
||||
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
|
||||
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
|
||||
| --run=* | --ru=* | --r=*)
|
||||
runstatedir=$ac_optarg ;;
|
||||
|
||||
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
|
||||
ac_prev=sbindir ;;
|
||||
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
|
||||
|
@ -1130,7 +1143,7 @@ fi
|
|||
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
|
||||
datadir sysconfdir sharedstatedir localstatedir includedir \
|
||||
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
|
||||
libdir localedir mandir
|
||||
libdir localedir mandir runstatedir
|
||||
do
|
||||
eval ac_val=\$$ac_var
|
||||
# Remove trailing slashes.
|
||||
|
@ -1283,6 +1296,7 @@ Fine tuning of the installation directories:
|
|||
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
|
||||
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
|
||||
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
|
||||
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
|
||||
--libdir=DIR object code libraries [EPREFIX/lib]
|
||||
--includedir=DIR C header files [PREFIX/include]
|
||||
--oldincludedir=DIR C header files for non-gcc [/usr/include]
|
||||
|
@ -1319,6 +1333,7 @@ Optional Packages:
|
|||
--without-PACKAGE do not use PACKAGE (same as --with-PACKAGE=no)
|
||||
--with-extra-version=STRING
|
||||
append STRING to version
|
||||
--with-lz4 use lz4
|
||||
--without-libcurl do not use libcurl for anonymous statistics
|
||||
collection
|
||||
--with-reports-hostname=HOSTNAME
|
||||
|
@ -4343,6 +4358,99 @@ if test "$enable_coverage" = yes; then
|
|||
CITUS_LDFLAGS="$CITUS_LDFLAGS --coverage"
|
||||
fi
|
||||
|
||||
#
|
||||
# LZ4
|
||||
#
|
||||
|
||||
|
||||
|
||||
# Check whether --with-lz4 was given.
|
||||
if test "${with_lz4+set}" = set; then :
|
||||
withval=$with_lz4;
|
||||
case $withval in
|
||||
yes)
|
||||
:
|
||||
;;
|
||||
no)
|
||||
:
|
||||
;;
|
||||
*)
|
||||
as_fn_error $? "no argument expected for --with-lz4 option" "$LINENO" 5
|
||||
;;
|
||||
esac
|
||||
|
||||
else
|
||||
with_lz4=no
|
||||
|
||||
fi
|
||||
|
||||
|
||||
|
||||
|
||||
if test "$with_lz4" = yes; then
|
||||
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for LZ4_compress_default in -llz4" >&5
|
||||
$as_echo_n "checking for LZ4_compress_default in -llz4... " >&6; }
|
||||
if ${ac_cv_lib_lz4_LZ4_compress_default+:} false; then :
|
||||
$as_echo_n "(cached) " >&6
|
||||
else
|
||||
ac_check_lib_save_LIBS=$LIBS
|
||||
LIBS="-llz4 $LIBS"
|
||||
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
|
||||
/* end confdefs.h. */
|
||||
|
||||
/* Override any GCC internal prototype to avoid an error.
|
||||
Use char because int might match the return type of a GCC
|
||||
builtin and then its argument prototype would still apply. */
|
||||
#ifdef __cplusplus
|
||||
extern "C"
|
||||
#endif
|
||||
char LZ4_compress_default ();
|
||||
int
|
||||
main ()
|
||||
{
|
||||
return LZ4_compress_default ();
|
||||
;
|
||||
return 0;
|
||||
}
|
||||
_ACEOF
|
||||
if ac_fn_c_try_link "$LINENO"; then :
|
||||
ac_cv_lib_lz4_LZ4_compress_default=yes
|
||||
else
|
||||
ac_cv_lib_lz4_LZ4_compress_default=no
|
||||
fi
|
||||
rm -f core conftest.err conftest.$ac_objext \
|
||||
conftest$ac_exeext conftest.$ac_ext
|
||||
LIBS=$ac_check_lib_save_LIBS
|
||||
fi
|
||||
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_lz4_LZ4_compress_default" >&5
|
||||
$as_echo "$ac_cv_lib_lz4_LZ4_compress_default" >&6; }
|
||||
if test "x$ac_cv_lib_lz4_LZ4_compress_default" = xyes; then :
|
||||
cat >>confdefs.h <<_ACEOF
|
||||
#define HAVE_LIBLZ4 1
|
||||
_ACEOF
|
||||
|
||||
LIBS="-llz4 $LIBS"
|
||||
|
||||
else
|
||||
as_fn_error $? "lz4 library not found
|
||||
If you have lz4 installed, see config.log for details on the
|
||||
failure. It is possible the compiler isn't looking in the proper directory.
|
||||
Use --without-lz4 to disable zlib support." "$LINENO" 5
|
||||
fi
|
||||
|
||||
ac_fn_c_check_header_mongrel "$LINENO" "lz4.h" "ac_cv_header_lz4_h" "$ac_includes_default"
|
||||
if test "x$ac_cv_header_lz4_h" = xyes; then :
|
||||
|
||||
else
|
||||
as_fn_error $? "lz4 header not found
|
||||
If you have lz4 already installed, see config.log for details on the
|
||||
failure. It is possible the compiler isn't looking in the proper directory.
|
||||
Use --without-lz4 to disable lz4 support." "$LINENO" 5
|
||||
fi
|
||||
|
||||
|
||||
fi
|
||||
|
||||
#
|
||||
# libcurl
|
||||
#
|
||||
|
|
19
configure.in
19
configure.in
|
@ -185,6 +185,25 @@ if test "$enable_coverage" = yes; then
|
|||
CITUS_LDFLAGS="$CITUS_LDFLAGS --coverage"
|
||||
fi
|
||||
|
||||
#
|
||||
# LZ4
|
||||
#
|
||||
PGAC_ARG_BOOL(with, lz4, no,
|
||||
[use lz4])
|
||||
AC_SUBST(with_lz4)
|
||||
|
||||
if test "$with_lz4" = yes; then
|
||||
AC_CHECK_LIB(lz4, LZ4_compress_default, [],
|
||||
[AC_MSG_ERROR([lz4 library not found
|
||||
If you have lz4 installed, see config.log for details on the
|
||||
failure. It is possible the compiler isn't looking in the proper directory.
|
||||
Use --without-lz4 to disable zlib support.])])
|
||||
AC_CHECK_HEADER(lz4.h, [], [AC_MSG_ERROR([lz4 header not found
|
||||
If you have lz4 already installed, see config.log for details on the
|
||||
failure. It is possible the compiler isn't looking in the proper directory.
|
||||
Use --without-lz4 to disable lz4 support.])])
|
||||
fi
|
||||
|
||||
#
|
||||
# libcurl
|
||||
#
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "utils/guc.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
#include "citus_version.h"
|
||||
#include "columnar/cstore.h"
|
||||
|
||||
/* Default values for option parameters */
|
||||
|
@ -35,6 +36,9 @@ static const struct config_enum_entry cstore_compression_options[] =
|
|||
{
|
||||
{ "none", COMPRESSION_NONE, false },
|
||||
{ "pglz", COMPRESSION_PG_LZ, false },
|
||||
#if HAVE_LIBLZ4
|
||||
{ "lz4", COMPRESSION_LZ4, false },
|
||||
#endif
|
||||
{ NULL, 0, false }
|
||||
};
|
||||
|
||||
|
@ -81,21 +85,50 @@ cstore_init()
|
|||
}
|
||||
|
||||
|
||||
/* ParseCompressionType converts a string to a compression type. */
|
||||
/*
|
||||
* ParseCompressionType converts a string to a compression type.
|
||||
* For compression algorithms that are invalid or not compiled, it
|
||||
* returns COMPRESSION_TYPE_INVALID.
|
||||
*/
|
||||
CompressionType
|
||||
ParseCompressionType(const char *compressionTypeString)
|
||||
{
|
||||
CompressionType compressionType = COMPRESSION_TYPE_INVALID;
|
||||
Assert(compressionTypeString != NULL);
|
||||
|
||||
if (strncmp(compressionTypeString, COMPRESSION_STRING_NONE, NAMEDATALEN) == 0)
|
||||
for (int compressionIndex = 0;
|
||||
cstore_compression_options[compressionIndex].name != NULL;
|
||||
compressionIndex++)
|
||||
{
|
||||
compressionType = COMPRESSION_NONE;
|
||||
}
|
||||
else if (strncmp(compressionTypeString, COMPRESSION_STRING_PG_LZ, NAMEDATALEN) == 0)
|
||||
{
|
||||
compressionType = COMPRESSION_PG_LZ;
|
||||
const char *compressionName = cstore_compression_options[compressionIndex].name;
|
||||
if (strncmp(compressionTypeString, compressionName, NAMEDATALEN) == 0)
|
||||
{
|
||||
return cstore_compression_options[compressionIndex].val;
|
||||
}
|
||||
}
|
||||
|
||||
return compressionType;
|
||||
return COMPRESSION_TYPE_INVALID;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompressionTypeStr returns string representation of a compression type.
|
||||
* For compression algorithms that are invalid or not compiled, it
|
||||
* returns NULL.
|
||||
*/
|
||||
const char *
|
||||
CompressionTypeStr(CompressionType requestedType)
|
||||
{
|
||||
for (int compressionIndex = 0;
|
||||
cstore_compression_options[compressionIndex].name != NULL;
|
||||
compressionIndex++)
|
||||
{
|
||||
CompressionType compressionType =
|
||||
cstore_compression_options[compressionIndex].val;
|
||||
if (compressionType == requestedType)
|
||||
{
|
||||
return cstore_compression_options[compressionIndex].name;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -13,16 +13,13 @@
|
|||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
#include "common/pg_lzcompress.h"
|
||||
#else
|
||||
#include "utils/pg_lzcompress.h"
|
||||
#endif
|
||||
|
||||
#include "citus_version.h"
|
||||
#include "columnar/cstore.h"
|
||||
#include "common/pg_lzcompress.h"
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
#if HAVE_LIBLZ4
|
||||
#include <lz4.h>
|
||||
#endif
|
||||
|
||||
/*
|
||||
* The information at the start of the compressed data. This decription is taken
|
||||
|
@ -44,16 +41,6 @@ typedef struct CStoreCompressHeader
|
|||
#define CSTORE_COMPRESS_SET_RAWSIZE(ptr, len) (((CStoreCompressHeader *) (ptr))->rawsize = \
|
||||
(len))
|
||||
|
||||
#else
|
||||
|
||||
#define CSTORE_COMPRESS_HDRSZ (0)
|
||||
#define CSTORE_COMPRESS_RAWSIZE(ptr) (PGLZ_RAW_SIZE((PGLZ_Header *) buffer->data))
|
||||
#define CSTORE_COMPRESS_RAWDATA(ptr) (((PGLZ_Header *) (ptr)))
|
||||
#define CSTORE_COMPRESS_SET_RAWSIZE(ptr, len) (((CStoreCompressHeader *) (ptr))->rawsize = \
|
||||
(len))
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* CompressBuffer compresses the given buffer with the given compression type
|
||||
|
@ -65,45 +52,70 @@ bool
|
|||
CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
||||
CompressionType compressionType)
|
||||
{
|
||||
uint64 maximumLength = PGLZ_MAX_OUTPUT(inputBuffer->len) + CSTORE_COMPRESS_HDRSZ;
|
||||
bool compressionResult = false;
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
int32 compressedByteCount = 0;
|
||||
switch (compressionType)
|
||||
{
|
||||
#if HAVE_LIBLZ4
|
||||
case COMPRESSION_LZ4:
|
||||
{
|
||||
int maximumLength = LZ4_compressBound(inputBuffer->len);
|
||||
|
||||
resetStringInfo(outputBuffer);
|
||||
enlargeStringInfo(outputBuffer, maximumLength);
|
||||
|
||||
int compressedSize = LZ4_compress_default(inputBuffer->data,
|
||||
outputBuffer->data,
|
||||
inputBuffer->len, maximumLength);
|
||||
if (compressedSize <= 0)
|
||||
{
|
||||
elog(DEBUG1,
|
||||
"failure in LZ4_compress_default, input size=%d, output size=%d",
|
||||
inputBuffer->len, maximumLength);
|
||||
return false;
|
||||
}
|
||||
|
||||
elog(DEBUG1, "compressed %d bytes to %d bytes", inputBuffer->len,
|
||||
compressedSize);
|
||||
|
||||
outputBuffer->len = compressedSize;
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (compressionType != COMPRESSION_PG_LZ)
|
||||
{
|
||||
return false;
|
||||
case COMPRESSION_PG_LZ:
|
||||
{
|
||||
uint64 maximumLength = PGLZ_MAX_OUTPUT(inputBuffer->len) +
|
||||
CSTORE_COMPRESS_HDRSZ;
|
||||
bool compressionResult = false;
|
||||
|
||||
resetStringInfo(outputBuffer);
|
||||
enlargeStringInfo(outputBuffer, maximumLength);
|
||||
|
||||
int32 compressedByteCount = pglz_compress((const char *) inputBuffer->data,
|
||||
inputBuffer->len,
|
||||
CSTORE_COMPRESS_RAWDATA(
|
||||
outputBuffer->data),
|
||||
PGLZ_strategy_always);
|
||||
if (compressedByteCount >= 0)
|
||||
{
|
||||
CSTORE_COMPRESS_SET_RAWSIZE(outputBuffer->data, inputBuffer->len);
|
||||
SET_VARSIZE_COMPRESSED(outputBuffer->data,
|
||||
compressedByteCount + CSTORE_COMPRESS_HDRSZ);
|
||||
compressionResult = true;
|
||||
}
|
||||
|
||||
if (compressionResult)
|
||||
{
|
||||
outputBuffer->len = VARSIZE(outputBuffer->data);
|
||||
}
|
||||
|
||||
return compressionResult;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
resetStringInfo(outputBuffer);
|
||||
enlargeStringInfo(outputBuffer, maximumLength);
|
||||
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
compressedByteCount = pglz_compress((const char *) inputBuffer->data,
|
||||
inputBuffer->len,
|
||||
CSTORE_COMPRESS_RAWDATA(outputBuffer->data),
|
||||
PGLZ_strategy_always);
|
||||
if (compressedByteCount >= 0)
|
||||
{
|
||||
CSTORE_COMPRESS_SET_RAWSIZE(outputBuffer->data, inputBuffer->len);
|
||||
SET_VARSIZE_COMPRESSED(outputBuffer->data,
|
||||
compressedByteCount + CSTORE_COMPRESS_HDRSZ);
|
||||
compressionResult = true;
|
||||
}
|
||||
#else
|
||||
|
||||
compressionResult = pglz_compress(inputBuffer->data, inputBuffer->len,
|
||||
CSTORE_COMPRESS_RAWDATA(outputBuffer->data),
|
||||
PGLZ_strategy_always);
|
||||
#endif
|
||||
|
||||
if (compressionResult)
|
||||
{
|
||||
outputBuffer->len = VARSIZE(outputBuffer->data);
|
||||
}
|
||||
|
||||
return compressionResult;
|
||||
}
|
||||
|
||||
|
||||
|
@ -112,85 +124,84 @@ CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
|||
* type. This function returns the buffer as-is when no compression is applied.
|
||||
*/
|
||||
StringInfo
|
||||
DecompressBuffer(StringInfo buffer, CompressionType compressionType)
|
||||
DecompressBuffer(StringInfo buffer,
|
||||
CompressionType compressionType,
|
||||
uint64 decompressedSize)
|
||||
{
|
||||
StringInfo decompressedBuffer = NULL;
|
||||
|
||||
Assert(compressionType == COMPRESSION_NONE || compressionType == COMPRESSION_PG_LZ);
|
||||
|
||||
if (compressionType == COMPRESSION_NONE)
|
||||
{
|
||||
/* in case of no compression, return buffer */
|
||||
decompressedBuffer = buffer;
|
||||
}
|
||||
else if (compressionType == COMPRESSION_PG_LZ)
|
||||
{
|
||||
uint32 compressedDataSize = VARSIZE(buffer->data) - CSTORE_COMPRESS_HDRSZ;
|
||||
uint32 decompressedDataSize = CSTORE_COMPRESS_RAWSIZE(buffer->data);
|
||||
char *decompressedData = NULL;
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
int32 decompressedByteCount = 0;
|
||||
#endif
|
||||
|
||||
if (compressedDataSize + CSTORE_COMPRESS_HDRSZ != buffer->len)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
||||
errdetail("Expected %u bytes, but received %u bytes",
|
||||
compressedDataSize, buffer->len)));
|
||||
}
|
||||
|
||||
decompressedData = palloc0(decompressedDataSize);
|
||||
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
||||
compressedDataSize, decompressedData,
|
||||
decompressedDataSize, true);
|
||||
#else
|
||||
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
||||
compressedDataSize, decompressedData,
|
||||
decompressedDataSize);
|
||||
#endif
|
||||
|
||||
if (decompressedByteCount < 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
||||
errdetail("compressed data is corrupted")));
|
||||
}
|
||||
#else
|
||||
pglz_decompress((PGLZ_Header *) buffer->data, decompressedData);
|
||||
#endif
|
||||
|
||||
decompressedBuffer = palloc0(sizeof(StringInfoData));
|
||||
decompressedBuffer->data = decompressedData;
|
||||
decompressedBuffer->len = decompressedDataSize;
|
||||
decompressedBuffer->maxlen = decompressedDataSize;
|
||||
}
|
||||
|
||||
return decompressedBuffer;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompressionTypeStr returns string representation of a compression type.
|
||||
*/
|
||||
char *
|
||||
CompressionTypeStr(CompressionType type)
|
||||
{
|
||||
switch (type)
|
||||
switch (compressionType)
|
||||
{
|
||||
case COMPRESSION_NONE:
|
||||
{
|
||||
return "none";
|
||||
return buffer;
|
||||
}
|
||||
|
||||
#if HAVE_LIBLZ4
|
||||
case COMPRESSION_LZ4:
|
||||
{
|
||||
StringInfo decompressedBuffer = makeStringInfo();
|
||||
enlargeStringInfo(decompressedBuffer, decompressedSize);
|
||||
|
||||
int lz4DecompressSize = LZ4_decompress_safe(buffer->data,
|
||||
decompressedBuffer->data,
|
||||
buffer->len,
|
||||
decompressedSize);
|
||||
|
||||
if (lz4DecompressSize != decompressedSize)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
||||
errdetail("Expected %lu bytes, but received %d bytes",
|
||||
decompressedSize, lz4DecompressSize)));
|
||||
}
|
||||
|
||||
decompressedBuffer->len = decompressedSize;
|
||||
|
||||
return decompressedBuffer;
|
||||
}
|
||||
#endif
|
||||
|
||||
case COMPRESSION_PG_LZ:
|
||||
{
|
||||
return "pglz";
|
||||
StringInfo decompressedBuffer = NULL;
|
||||
uint32 compressedDataSize = VARSIZE(buffer->data) - CSTORE_COMPRESS_HDRSZ;
|
||||
uint32 decompressedDataSize = CSTORE_COMPRESS_RAWSIZE(buffer->data);
|
||||
int32 decompressedByteCount = 0;
|
||||
|
||||
if (compressedDataSize + CSTORE_COMPRESS_HDRSZ != buffer->len)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
||||
errdetail("Expected %u bytes, but received %u bytes",
|
||||
compressedDataSize, buffer->len)));
|
||||
}
|
||||
|
||||
char *decompressedData = palloc0(decompressedDataSize);
|
||||
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
||||
compressedDataSize, decompressedData,
|
||||
decompressedDataSize, true);
|
||||
#else
|
||||
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
|
||||
compressedDataSize, decompressedData,
|
||||
decompressedDataSize);
|
||||
#endif
|
||||
|
||||
if (decompressedByteCount < 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot decompress the buffer"),
|
||||
errdetail("compressed data is corrupted")));
|
||||
}
|
||||
|
||||
decompressedBuffer = palloc0(sizeof(StringInfoData));
|
||||
decompressedBuffer->data = decompressedData;
|
||||
decompressedBuffer->len = decompressedDataSize;
|
||||
decompressedBuffer->maxlen = decompressedDataSize;
|
||||
|
||||
return decompressedBuffer;
|
||||
}
|
||||
|
||||
default:
|
||||
return "unknown";
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected compression type: %d", compressionType)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -456,6 +456,8 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
|
|||
|
||||
chunkBuffersArray[chunkIndex]->valueBuffer = rawValueBuffer;
|
||||
chunkBuffersArray[chunkIndex]->valueCompressionType = compressionType;
|
||||
chunkBuffersArray[chunkIndex]->decompressedValueSize =
|
||||
chunkSkipNode->decompressedValueSize;
|
||||
}
|
||||
|
||||
ColumnBuffers *columnBuffers = palloc0(sizeof(ColumnBuffers));
|
||||
|
@ -912,8 +914,10 @@ DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
|
|||
columnBuffers->chunkBuffersArray[chunkIndex];
|
||||
|
||||
/* decompress and deserialize current chunk's data */
|
||||
StringInfo valueBuffer = DecompressBuffer(chunkBuffers->valueBuffer,
|
||||
chunkBuffers->valueCompressionType);
|
||||
StringInfo valueBuffer =
|
||||
DecompressBuffer(chunkBuffers->valueBuffer,
|
||||
chunkBuffers->valueCompressionType,
|
||||
chunkBuffers->decompressedValueSize);
|
||||
|
||||
if (chunkBuffers->valueCompressionType != COMPRESSION_NONE)
|
||||
{
|
||||
|
|
|
@ -769,9 +769,23 @@ LogRelationStats(Relation rel, int elevel)
|
|||
chunkCount, droppedChunksWithData);
|
||||
for (int compressionType = 0; compressionType < COMPRESSION_COUNT; compressionType++)
|
||||
{
|
||||
const char *compressionName = CompressionTypeStr(compressionType);
|
||||
|
||||
/* skip if this compression algorithm has not been compiled */
|
||||
if (compressionName == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* skip if no chunks use this compression type */
|
||||
if (compressionStats[compressionType] == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
appendStringInfo(infoBuf,
|
||||
", %s compressed: %d",
|
||||
CompressionTypeStr(compressionType),
|
||||
compressionName,
|
||||
compressionStats[compressionType]);
|
||||
}
|
||||
appendStringInfoString(infoBuf, "\n");
|
||||
|
|
|
@ -40,6 +40,9 @@
|
|||
/* Define to 1 if you have the `curl' library (-lcurl). */
|
||||
#undef HAVE_LIBCURL
|
||||
|
||||
/* Define to 1 if you have the `lz4' library (-llz4). */
|
||||
#undef HAVE_LIBLZ4
|
||||
|
||||
/* Define to 1 if you have the <memory.h> header file. */
|
||||
#undef HAVE_MEMORY_H
|
||||
|
||||
|
|
|
@ -24,6 +24,9 @@
|
|||
/* Define to 1 if you have the `curl' library (-lcurl). */
|
||||
#undef HAVE_LIBCURL
|
||||
|
||||
/* Define to 1 if you have the `liblz4' library (-llz4). */
|
||||
#undef HAVE_LIBLZ4
|
||||
|
||||
/* Base URL for statistics collection and update checks */
|
||||
#undef REPORTS_BASE_URL
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ typedef enum
|
|||
COMPRESSION_TYPE_INVALID = -1,
|
||||
COMPRESSION_NONE = 0,
|
||||
COMPRESSION_PG_LZ = 1,
|
||||
COMPRESSION_LZ4 = 2,
|
||||
|
||||
COMPRESSION_COUNT
|
||||
} CompressionType;
|
||||
|
@ -287,8 +288,9 @@ extern void FreeChunkData(ChunkData *chunkData);
|
|||
extern uint64 CStoreTableRowCount(Relation relation);
|
||||
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
||||
CompressionType compressionType);
|
||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
||||
extern char * CompressionTypeStr(CompressionType type);
|
||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType,
|
||||
uint64 decompressedSize);
|
||||
extern const char * CompressionTypeStr(CompressionType type);
|
||||
|
||||
/* cstore_metadata_tables.c */
|
||||
extern void InitColumnarOptions(Oid regclass);
|
||||
|
|
|
@ -14,6 +14,7 @@ test: am_update_delete
|
|||
test: am_copyto
|
||||
test: am_alter
|
||||
test: am_alter_set_type
|
||||
test: am_lz4
|
||||
test: am_rollback
|
||||
test: am_truncate
|
||||
test: am_vacuum
|
||||
|
|
|
@ -21,7 +21,18 @@ SELECT count(*) FROM contestant;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- Utility functions to be used throughout tests
|
||||
--
|
||||
CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus', $$columnar_relation_storageid$$;
|
||||
CREATE FUNCTION compression_type_supported(type text) RETURNS boolean
|
||||
AS $$
|
||||
BEGIN
|
||||
EXECUTE 'SET LOCAL columnar.compression TO ' || quote_literal(type);
|
||||
return true;
|
||||
EXCEPTION WHEN invalid_parameter_value THEN
|
||||
return false;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
|
|
@ -72,7 +72,7 @@ storage id: -1
|
|||
total file size: 0, total data size: 0
|
||||
compression rate: 1.00x
|
||||
total row count: 0, stripe count: 0, average rows per stripe: 0
|
||||
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
||||
chunk count: 0, containing data for dropped columns: 0
|
||||
|
||||
vacuum verbose t_uncompressed;
|
||||
INFO: statistics for "t_uncompressed":
|
||||
|
@ -80,7 +80,7 @@ storage id: -1
|
|||
total file size: 0, total data size: 0
|
||||
compression rate: 1.00x
|
||||
total row count: 0, stripe count: 0, average rows per stripe: 0
|
||||
chunk count: 0, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 0
|
||||
chunk count: 0, containing data for dropped columns: 0
|
||||
|
||||
-- vacuum full
|
||||
vacuum full t_compressed;
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
SELECT compression_type_supported('lz4') AS lz4_supported \gset
|
||||
\if :lz4_supported
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
CREATE SCHEMA am_alz4;
|
||||
SET search_path TO am_alz4;
|
||||
SET columnar.compression TO 'lz4';
|
||||
CREATE TABLE test_lz4 (a int, b text, c int) USING columnar;
|
||||
INSERT INTO test_lz4 SELECT floor(i / 1000), floor(i / 10)::text, 4 FROM generate_series(1, 10000) i;
|
||||
SELECT count(*) FROM test_lz4;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
10000
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_lz4 SELECT floor(i / 2), floor(i / 10)::text, 5 FROM generate_series(1000, 11000) i;
|
||||
SELECT count(*) FROM test_lz4;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
20001
|
||||
(1 row)
|
||||
|
||||
VACUUM VERBOSE test_lz4;
|
||||
INFO: statistics for "test_lz4":
|
||||
storage id: xxxxx
|
||||
total file size: 73728, total data size: 45729
|
||||
compression rate: 7.16x
|
||||
total row count: 20001, stripe count: 2, average rows per stripe: 10000
|
||||
chunk count: 9, containing data for dropped columns: 0, lz4 compressed: 9
|
||||
|
||||
SELECT DISTINCT * FROM test_lz4 ORDER BY a, b, c LIMIT 5;
|
||||
a | b | c
|
||||
---------------------------------------------------------------------
|
||||
0 | 0 | 4
|
||||
0 | 1 | 4
|
||||
0 | 10 | 4
|
||||
0 | 11 | 4
|
||||
0 | 12 | 4
|
||||
(5 rows)
|
||||
|
||||
-- compare compression rate to pglz
|
||||
SET columnar.compression TO 'pglz';
|
||||
CREATE TABLE test_pglz (LIKE test_lz4) USING columnar;
|
||||
INSERT INTO test_pglz SELECT * FROM test_lz4;
|
||||
VACUUM VERBOSE test_pglz;
|
||||
INFO: statistics for "test_pglz":
|
||||
storage id: xxxxx
|
||||
total file size: 57344, total data size: 35986
|
||||
compression rate: 9.10x
|
||||
total row count: 20001, stripe count: 1, average rows per stripe: 20001
|
||||
chunk count: 9, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 6
|
||||
|
||||
-- Other operations
|
||||
VACUUM FULL test_lz4;
|
||||
ANALYZE test_lz4;
|
||||
SELECT count(DISTINCT test_lz4.*) FROM test_lz4;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6002
|
||||
(1 row)
|
||||
|
||||
TRUNCATE test_lz4;
|
||||
SELECT count(DISTINCT test_lz4.*) FROM test_lz4;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA am_alz4 CASCADE;
|
|
@ -0,0 +1,4 @@
|
|||
SELECT compression_type_supported('lz4') AS lz4_supported \gset
|
||||
\if :lz4_supported
|
||||
\else
|
||||
\q
|
|
@ -152,7 +152,7 @@ storage id: xxxxx
|
|||
total file size: 122880, total data size: 10754
|
||||
compression rate: 1.00x
|
||||
total row count: 2530, stripe count: 3, average rows per stripe: 843
|
||||
chunk count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
|
||||
chunk count: 3, containing data for dropped columns: 0, none compressed: 3
|
||||
|
||||
INFO: "t": truncated 15 to 5 pages
|
||||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
|
@ -259,6 +259,6 @@ storage id: xxxxx
|
|||
total file size: 630784, total data size: 604480
|
||||
compression rate: 33.71x
|
||||
total row count: 1000000, stripe count: 1, average rows per stripe: 1000000
|
||||
chunk count: 30, containing data for dropped columns: 0, none compressed: 0, pglz compressed: 30
|
||||
chunk count: 30, containing data for dropped columns: 0, pglz compressed: 30
|
||||
|
||||
DROP TABLE t;
|
||||
|
|
|
@ -15,7 +15,7 @@ storage id: xxxxx
|
|||
total file size: 24576, total data size: 26
|
||||
compression rate: 1.00x
|
||||
total row count: 3, stripe count: 1, average rows per stripe: 3
|
||||
chunk count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
||||
chunk count: 2, containing data for dropped columns: 0, none compressed: 2
|
||||
|
||||
s2: INFO: "test_vacuum_vs_insert": stopping truncate due to conflicting lock request
|
||||
step s2-vacuum:
|
||||
|
|
|
@ -21,7 +21,20 @@ CREATE TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT,
|
|||
ANALYZE contestant;
|
||||
SELECT count(*) FROM contestant;
|
||||
|
||||
--
|
||||
-- Utility functions to be used throughout tests
|
||||
--
|
||||
|
||||
CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus', $$columnar_relation_storageid$$;
|
||||
|
||||
CREATE FUNCTION compression_type_supported(type text) RETURNS boolean
|
||||
AS $$
|
||||
BEGIN
|
||||
EXECUTE 'SET LOCAL columnar.compression TO ' || quote_literal(type);
|
||||
return true;
|
||||
EXCEPTION WHEN invalid_parameter_value THEN
|
||||
return false;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
SELECT compression_type_supported('lz4') AS lz4_supported \gset
|
||||
\if :lz4_supported
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
|
||||
CREATE SCHEMA am_alz4;
|
||||
SET search_path TO am_alz4;
|
||||
|
||||
SET columnar.compression TO 'lz4';
|
||||
CREATE TABLE test_lz4 (a int, b text, c int) USING columnar;
|
||||
|
||||
INSERT INTO test_lz4 SELECT floor(i / 1000), floor(i / 10)::text, 4 FROM generate_series(1, 10000) i;
|
||||
SELECT count(*) FROM test_lz4;
|
||||
|
||||
INSERT INTO test_lz4 SELECT floor(i / 2), floor(i / 10)::text, 5 FROM generate_series(1000, 11000) i;
|
||||
SELECT count(*) FROM test_lz4;
|
||||
|
||||
VACUUM VERBOSE test_lz4;
|
||||
|
||||
SELECT DISTINCT * FROM test_lz4 ORDER BY a, b, c LIMIT 5;
|
||||
|
||||
-- compare compression rate to pglz
|
||||
SET columnar.compression TO 'pglz';
|
||||
CREATE TABLE test_pglz (LIKE test_lz4) USING columnar;
|
||||
INSERT INTO test_pglz SELECT * FROM test_lz4;
|
||||
|
||||
VACUUM VERBOSE test_pglz;
|
||||
|
||||
-- Other operations
|
||||
VACUUM FULL test_lz4;
|
||||
ANALYZE test_lz4;
|
||||
|
||||
SELECT count(DISTINCT test_lz4.*) FROM test_lz4;
|
||||
|
||||
TRUNCATE test_lz4;
|
||||
|
||||
SELECT count(DISTINCT test_lz4.*) FROM test_lz4;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA am_alz4 CASCADE;
|
Loading…
Reference in New Issue