mirror of https://github.com/citusdata/citus.git
Feature: cstore table options (#25)
DESCRIPTION: Add UDF's to maintain cstore table options This PR adds two UDF's and a view to interact and maintain the cstore table options. - ``alter_cstore_table_set(relid REGCLASS, [ options ... ])`` - ``alter_cstore_table_reset(relid REGCLASS, [ options ... ])`` - ``cstore.cstore_options`` The `set` function takes options and their specific types. When specified it will change the option associated with the table to the provided value. When omitted no action is taken. The `reset` function takes options as booleans. When set to `true` the value of the option associated with the table will be reset to the current default as specified by the associated GUC's. The options view containes a record for every cstore table with its associated settings as columns.merge-cstore-pykello
parent
8909769975
commit
d03e9ca861
2
Makefile
2
Makefile
|
@ -56,7 +56,7 @@ ifeq ($(USE_TABLEAM),yes)
|
|||
OBJS += cstore_tableam.o cstore_customscan.o
|
||||
REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \
|
||||
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \
|
||||
am_block_filtering am_join am_trigger
|
||||
am_block_filtering am_join am_trigger am_tableoptions
|
||||
ISOLATION += am_write_concurrency am_vacuum_vs_insert
|
||||
endif
|
||||
|
||||
|
|
8
cstore.h
8
cstore.h
|
@ -93,6 +93,8 @@ typedef struct DataFileMetadata
|
|||
{
|
||||
List *stripeMetadataList;
|
||||
uint64 blockRowCount;
|
||||
uint64 stripeRowCount;
|
||||
CompressionType compression;
|
||||
} DataFileMetadata;
|
||||
|
||||
|
||||
|
@ -277,10 +279,14 @@ extern uint64 CStoreTableRowCount(Relation relation);
|
|||
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
||||
CompressionType compressionType);
|
||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
||||
extern char * CompressionTypeStr(CompressionType type);
|
||||
|
||||
/* cstore_metadata_tables.c */
|
||||
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
|
||||
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount);
|
||||
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int
|
||||
stripeRowCount, CompressionType compression);
|
||||
extern void UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int
|
||||
stripeRowCount, CompressionType compression);
|
||||
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk);
|
||||
extern uint64 GetHighestUsedAddress(Oid relfilenode);
|
||||
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
||||
|
|
|
@ -170,3 +170,27 @@ DecompressBuffer(StringInfo buffer, CompressionType compressionType)
|
|||
|
||||
return decompressedBuffer;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompressionTypeStr returns string representation of a compression type.
|
||||
*/
|
||||
char *
|
||||
CompressionTypeStr(CompressionType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case COMPRESSION_NONE:
|
||||
{
|
||||
return "none";
|
||||
}
|
||||
|
||||
case COMPRESSION_PG_LZ:
|
||||
{
|
||||
return "pglz";
|
||||
}
|
||||
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,24 @@ IF version() ~ '12' or version() ~ '13' THEN
|
|||
|
||||
CREATE ACCESS METHOD cstore_tableam
|
||||
TYPE TABLE HANDLER cstore_tableam_handler;
|
||||
|
||||
CREATE FUNCTION pg_catalog.alter_cstore_table_set(
|
||||
table_name regclass,
|
||||
block_row_count int DEFAULT NULL,
|
||||
stripe_row_count int DEFAULT NULL,
|
||||
compression name DEFAULT null)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', 'alter_cstore_table_set';
|
||||
|
||||
CREATE FUNCTION pg_catalog.alter_cstore_table_reset(
|
||||
table_name regclass,
|
||||
block_row_count bool DEFAULT false,
|
||||
stripe_row_count bool DEFAULT false,
|
||||
compression bool DEFAULT false)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', 'alter_cstore_table_reset';
|
||||
$$;
|
||||
END IF;
|
||||
END$proc$;
|
||||
|
|
|
@ -34,6 +34,8 @@ LANGUAGE C STRICT;
|
|||
CREATE TABLE cstore_data_files (
|
||||
relfilenode oid NOT NULL,
|
||||
block_row_count int NOT NULL,
|
||||
stripe_row_count int NOT NULL,
|
||||
compression name NOT NULL,
|
||||
version_major bigint NOT NULL,
|
||||
version_minor bigint NOT NULL,
|
||||
PRIMARY KEY (relfilenode)
|
||||
|
@ -74,3 +76,13 @@ CREATE TABLE cstore_skipnodes (
|
|||
) WITH (user_catalog_table = true);
|
||||
|
||||
COMMENT ON TABLE cstore_skipnodes IS 'CStore per block metadata';
|
||||
|
||||
CREATE VIEW cstore_options AS
|
||||
SELECT c.oid::regclass regclass,
|
||||
d.block_row_count,
|
||||
d.stripe_row_count,
|
||||
d.compression
|
||||
FROM pg_class c
|
||||
JOIN cstore.cstore_data_files d USING(relfilenode);
|
||||
|
||||
COMMENT ON VIEW cstore_options IS 'CStore per table settings';
|
||||
|
|
|
@ -280,7 +280,8 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
|||
AccessShareLock, false);
|
||||
Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock);
|
||||
CStoreOptions *options = CStoreGetOptions(relationId);
|
||||
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount);
|
||||
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount,
|
||||
options->stripeRowCount, options->compressionType);
|
||||
heap_close(relation, AccessExclusiveLock);
|
||||
}
|
||||
}
|
||||
|
@ -797,7 +798,8 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
Assert(IsCStoreFdwTable(relationId));
|
||||
|
||||
FdwNewRelFileNode(relation);
|
||||
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount);
|
||||
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount,
|
||||
options->stripeRowCount, options->compressionType);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ static Oid CStoreDataFilesIndexRelationId(void);
|
|||
static Oid CStoreSkipNodesRelationId(void);
|
||||
static Oid CStoreSkipNodesIndexRelationId(void);
|
||||
static Oid CStoreNamespaceId(void);
|
||||
static bool ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount);
|
||||
static bool ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata);
|
||||
static ModifyState * StartModifyRelation(Relation rel);
|
||||
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||
bool *nulls);
|
||||
|
@ -68,11 +68,31 @@ static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
|
|||
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||
|
||||
/* constants for cstore_table */
|
||||
#define Natts_cstore_data_files 4
|
||||
#define Natts_cstore_data_files 6
|
||||
#define Anum_cstore_data_files_relfilenode 1
|
||||
#define Anum_cstore_data_files_block_row_count 2
|
||||
#define Anum_cstore_data_files_version_major 3
|
||||
#define Anum_cstore_data_files_version_minor 4
|
||||
#define Anum_cstore_data_files_stripe_row_count 3
|
||||
#define Anum_cstore_data_files_compression 4
|
||||
#define Anum_cstore_data_files_version_major 5
|
||||
#define Anum_cstore_data_files_version_minor 6
|
||||
|
||||
/* ----------------
|
||||
* cstore.cstore_data_files definition.
|
||||
* ----------------
|
||||
*/
|
||||
typedef struct FormData_cstore_data_files
|
||||
{
|
||||
Oid relfilenode;
|
||||
int32 block_row_count;
|
||||
int32 stripe_row_count;
|
||||
NameData compression;
|
||||
int64 version_major;
|
||||
int64 version_minor;
|
||||
|
||||
#ifdef CATALOG_VARLEN /* variable-length fields start here */
|
||||
#endif
|
||||
} FormData_cstore_data_files;
|
||||
typedef FormData_cstore_data_files *Form_cstore_data_files;
|
||||
|
||||
/* constants for cstore_stripe */
|
||||
#define Natts_cstore_stripes 8
|
||||
|
@ -106,16 +126,22 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
|||
* in cstore_data_files.
|
||||
*/
|
||||
void
|
||||
InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount)
|
||||
InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCount,
|
||||
CompressionType compression)
|
||||
{
|
||||
Oid cstoreDataFilesOid = InvalidOid;
|
||||
Relation cstoreDataFiles = NULL;
|
||||
ModifyState *modifyState = NULL;
|
||||
NameData compressionName = { 0 };
|
||||
|
||||
namestrcpy(&compressionName, CompressionTypeStr(compression));
|
||||
|
||||
bool nulls[Natts_cstore_data_files] = { 0 };
|
||||
Datum values[Natts_cstore_data_files] = {
|
||||
ObjectIdGetDatum(relfilenode),
|
||||
Int32GetDatum(blockRowCount),
|
||||
Int32GetDatum(stripeRowCount),
|
||||
NameGetDatum(&compressionName),
|
||||
Int32GetDatum(CSTORE_VERSION_MAJOR),
|
||||
Int32GetDatum(CSTORE_VERSION_MINOR)
|
||||
};
|
||||
|
@ -135,6 +161,84 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCount,
|
||||
CompressionType compression)
|
||||
{
|
||||
const int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
bool indexOK = true;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
Form_cstore_data_files metadata = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
Datum values[Natts_cstore_data_files] = { 0 };
|
||||
bool isnull[Natts_cstore_data_files] = { 0 };
|
||||
bool replace[Natts_cstore_data_files] = { 0 };
|
||||
|
||||
Relation cstoreDataFiles = heap_open(CStoreDataFilesRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode, BTEqualStrategyNumber,
|
||||
F_INT8EQ, ObjectIdGetDatum(relfilenode));
|
||||
|
||||
scanDescriptor = systable_beginscan(cstoreDataFiles,
|
||||
CStoreDataFilesIndexRelationId(),
|
||||
indexOK,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
if (heapTuple == NULL)
|
||||
{
|
||||
ereport(ERROR, (errmsg("relfilenode %d doesn't belong to a cstore table",
|
||||
relfilenode)));
|
||||
}
|
||||
|
||||
metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
|
||||
|
||||
bool changed = false;
|
||||
if (metadata->block_row_count != blockRowCount)
|
||||
{
|
||||
values[Anum_cstore_data_files_block_row_count - 1] = Int32GetDatum(blockRowCount);
|
||||
isnull[Anum_cstore_data_files_block_row_count - 1] = false;
|
||||
replace[Anum_cstore_data_files_block_row_count - 1] = true;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (metadata->stripe_row_count != stripeRowCount)
|
||||
{
|
||||
values[Anum_cstore_data_files_stripe_row_count - 1] = Int32GetDatum(
|
||||
stripeRowCount);
|
||||
isnull[Anum_cstore_data_files_stripe_row_count - 1] = false;
|
||||
replace[Anum_cstore_data_files_stripe_row_count - 1] = true;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (ParseCompressionType(NameStr(metadata->compression)) != compression)
|
||||
{
|
||||
Name compressionName = palloc0(sizeof(NameData));
|
||||
namestrcpy(compressionName, CompressionTypeStr(compression));
|
||||
values[Anum_cstore_data_files_compression - 1] = NameGetDatum(compressionName);
|
||||
isnull[Anum_cstore_data_files_compression - 1] = false;
|
||||
replace[Anum_cstore_data_files_compression - 1] = true;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (changed)
|
||||
{
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
|
||||
replace);
|
||||
|
||||
CatalogTupleUpdate(cstoreDataFiles, &heapTuple->t_self, heapTuple);
|
||||
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
|
||||
heap_close(cstoreDataFiles, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SaveStripeSkipList saves StripeSkipList for a given stripe as rows
|
||||
* of cstore_skipnodes.
|
||||
|
@ -355,7 +459,7 @@ DataFileMetadata *
|
|||
ReadDataFileMetadata(Oid relfilenode, bool missingOk)
|
||||
{
|
||||
DataFileMetadata *datafileMetadata = palloc0(sizeof(DataFileMetadata));
|
||||
bool found = ReadCStoreDataFiles(relfilenode, &datafileMetadata->blockRowCount);
|
||||
bool found = ReadCStoreDataFiles(relfilenode, datafileMetadata);
|
||||
if (!found)
|
||||
{
|
||||
if (!missingOk)
|
||||
|
@ -555,7 +659,7 @@ ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot)
|
|||
* false if table was not found in cstore_data_files.
|
||||
*/
|
||||
static bool
|
||||
ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount)
|
||||
ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
||||
{
|
||||
bool found = false;
|
||||
Oid cstoreDataFilesOid = InvalidOid;
|
||||
|
@ -599,8 +703,19 @@ ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount)
|
|||
Datum datumArray[Natts_cstore_data_files];
|
||||
bool isNullArray[Natts_cstore_data_files];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
*blockRowCount = DatumGetInt32(datumArray[Anum_cstore_data_files_block_row_count -
|
||||
1]);
|
||||
|
||||
if (metadata)
|
||||
{
|
||||
Name compressionName = NULL;
|
||||
|
||||
metadata->blockRowCount = DatumGetInt32(
|
||||
datumArray[Anum_cstore_data_files_block_row_count - 1]);
|
||||
metadata->stripeRowCount = DatumGetInt32(
|
||||
datumArray[Anum_cstore_data_files_stripe_row_count - 1]);
|
||||
compressionName = DatumGetName(
|
||||
datumArray[Anum_cstore_data_files_compression - 1]);
|
||||
metadata->compression = ParseCompressionType(NameStr(*compressionName));
|
||||
}
|
||||
found = true;
|
||||
}
|
||||
|
||||
|
|
209
cstore_tableam.c
209
cstore_tableam.c
|
@ -97,11 +97,15 @@ static bool IsCStoreTableAmTable(Oid relationId);
|
|||
static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode,
|
||||
int timeout, int retryInterval);
|
||||
static void LogRelationStats(Relation rel, int elevel);
|
||||
static char * CompressionTypeStr(CompressionType type);
|
||||
static void TruncateCStore(Relation rel, int elevel);
|
||||
|
||||
|
||||
/*
|
||||
* CStoreTableAMDefaultOptions returns the default options for a cstore table am table.
|
||||
* These options are based on the GUC's controlling the defaults.
|
||||
*/
|
||||
static CStoreOptions *
|
||||
CStoreTableAMGetOptions(void)
|
||||
CStoreTableAMDefaultOptions()
|
||||
{
|
||||
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
cstoreOptions->compressionType = cstore_compression;
|
||||
|
@ -111,6 +115,27 @@ CStoreTableAMGetOptions(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreTableAMGetOptions returns the options based on a relation. It is advised the
|
||||
* relation is a cstore table am table, if not it will raise an error
|
||||
*/
|
||||
static CStoreOptions *
|
||||
CStoreTableAMGetOptions(Relation rel)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
DataFileMetadata *metadata = NULL;
|
||||
|
||||
Assert(rel != NULL);
|
||||
|
||||
cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
|
||||
cstoreOptions->compressionType = metadata->compression;
|
||||
cstoreOptions->stripeRowCount = metadata->stripeRowCount;
|
||||
cstoreOptions->blockRowCount = metadata->blockRowCount;
|
||||
return cstoreOptions;
|
||||
}
|
||||
|
||||
|
||||
static MemoryContext
|
||||
GetCStoreMemoryContext()
|
||||
{
|
||||
|
@ -145,7 +170,7 @@ cstore_init_write_state(Relation relation)
|
|||
|
||||
if (CStoreWriteState == NULL)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions();
|
||||
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relation);
|
||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
|
||||
elog(LOG, "initializing write state for relation %d", relation->rd_id);
|
||||
|
@ -534,17 +559,23 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
SMgrRelation srel;
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
uint64 blockRowCount = 0;
|
||||
uint64 stripeRowCount = 0;
|
||||
CompressionType compression = 0;
|
||||
|
||||
if (metadata != NULL)
|
||||
{
|
||||
/* existing table (e.g. TRUNCATE), use existing blockRowCount */
|
||||
blockRowCount = metadata->blockRowCount;
|
||||
stripeRowCount = metadata->stripeRowCount;
|
||||
compression = metadata->compression;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* new table, use options */
|
||||
CStoreOptions *options = CStoreTableAMGetOptions();
|
||||
CStoreOptions *options = CStoreTableAMDefaultOptions();
|
||||
blockRowCount = options->blockRowCount;
|
||||
stripeRowCount = options->stripeRowCount;
|
||||
compression = options->compressionType;
|
||||
}
|
||||
|
||||
/* delete old relfilenode metadata */
|
||||
|
@ -554,7 +585,8 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
*freezeXid = RecentXmin;
|
||||
*minmulti = GetOldestMultiXactId();
|
||||
srel = RelationCreateStorage(*newrnode, persistence);
|
||||
InitCStoreDataFileMetadata(newrnode->relNode, blockRowCount);
|
||||
InitCStoreDataFileMetadata(newrnode->relNode, blockRowCount, stripeRowCount,
|
||||
compression);
|
||||
smgrclose(srel);
|
||||
}
|
||||
|
||||
|
@ -575,7 +607,8 @@ cstore_relation_nontransactional_truncate(Relation rel)
|
|||
|
||||
/* Delete old relfilenode metadata and recreate it */
|
||||
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
|
||||
InitCStoreDataFileMetadata(rel->rd_node.relNode, metadata->blockRowCount);
|
||||
InitCStoreDataFileMetadata(rel->rd_node.relNode, metadata->blockRowCount,
|
||||
metadata->stripeRowCount, metadata->compression);
|
||||
}
|
||||
|
||||
|
||||
|
@ -623,7 +656,19 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
*/
|
||||
Assert(sourceDesc->natts == targetDesc->natts);
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions();
|
||||
/*
|
||||
* Since we are copying into a new relation we need to copy the settings from the old
|
||||
* relation first.
|
||||
*/
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions(OldHeap);
|
||||
|
||||
UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode,
|
||||
cstoreOptions->blockRowCount,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->compressionType);
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions(NewHeap);
|
||||
|
||||
writeState = CStoreBeginWrite(NewHeap,
|
||||
cstoreOptions->compressionType,
|
||||
|
@ -756,26 +801,6 @@ LogRelationStats(Relation rel, int elevel)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompressionTypeStr returns string representation of a compression type.
|
||||
*/
|
||||
static char *
|
||||
CompressionTypeStr(CompressionType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case COMPRESSION_NONE:
|
||||
return "none";
|
||||
|
||||
case COMPRESSION_PG_LZ:
|
||||
return "pglz";
|
||||
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TruncateCStore truncates the unused space at the end of main fork for
|
||||
* a cstore table. This unused space can be created by aborted transactions.
|
||||
|
@ -1262,3 +1287,133 @@ cstore_tableam_handler(PG_FUNCTION_ARGS)
|
|||
{
|
||||
PG_RETURN_POINTER(&cstore_am_methods);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* alter_cstore_table_set is a UDF exposed in postgres to change settings on a columnar
|
||||
* table. Calling this function on a non-columnar table gives an error.
|
||||
*
|
||||
* sql syntax:
|
||||
* pg_catalog.alter_cstore_table_set(
|
||||
* table_name regclass,
|
||||
* block_row_count int DEFAULT NULL,
|
||||
* stripe_row_count int DEFAULT NULL,
|
||||
* compression name DEFAULT null)
|
||||
*
|
||||
* All arguments except the table name are optional. The UDF is supposed to be called
|
||||
* like:
|
||||
* SELECT alter_cstore_table_set('table', compression => 'pglz');
|
||||
*
|
||||
* This will only update the compression of the table, keeping all other settings the
|
||||
* same. Multiple settings can be changed at the same time by providing multiple
|
||||
* arguments. Calling the argument with the NULL value will be interperted as not having
|
||||
* provided the argument.
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(alter_cstore_table_set);
|
||||
Datum
|
||||
alter_cstore_table_set(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
int blockRowCount = 0;
|
||||
int stripeRowCount = 0;
|
||||
CompressionType compression = COMPRESSION_TYPE_INVALID;
|
||||
|
||||
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
if (!metadata)
|
||||
{
|
||||
ereport(ERROR, (errmsg("table %s is not a cstore table",
|
||||
quote_identifier(RelationGetRelationName(rel)))));
|
||||
}
|
||||
|
||||
blockRowCount = metadata->blockRowCount;
|
||||
stripeRowCount = metadata->stripeRowCount;
|
||||
compression = metadata->compression;
|
||||
|
||||
/* block_row_count => not null */
|
||||
if (!PG_ARGISNULL(1))
|
||||
{
|
||||
blockRowCount = PG_GETARG_INT32(1);
|
||||
ereport(DEBUG1, (errmsg("updating block row count to %d", blockRowCount)));
|
||||
}
|
||||
|
||||
/* stripe_row_count => not null */
|
||||
if (!PG_ARGISNULL(2))
|
||||
{
|
||||
stripeRowCount = PG_GETARG_INT32(2);
|
||||
ereport(DEBUG1, (errmsg("updating stripe row count to %d", stripeRowCount)));
|
||||
}
|
||||
|
||||
/* compression => not null */
|
||||
if (!PG_ARGISNULL(3))
|
||||
{
|
||||
Name compressionName = PG_GETARG_NAME(3);
|
||||
compression = ParseCompressionType(NameStr(*compressionName));
|
||||
if (compression == COMPRESSION_TYPE_INVALID)
|
||||
{
|
||||
ereport(ERROR, (errmsg("unknown compression type for cstore table: %s",
|
||||
quote_identifier(NameStr(*compressionName)))));
|
||||
}
|
||||
ereport(DEBUG1, (errmsg("updating compression to %s",
|
||||
CompressionTypeStr(compression))));
|
||||
}
|
||||
|
||||
UpdateCStoreDataFileMetadata(rel->rd_node.relNode, blockRowCount, stripeRowCount,
|
||||
compression);
|
||||
|
||||
table_close(rel, NoLock);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(alter_cstore_table_reset);
|
||||
Datum
|
||||
alter_cstore_table_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
int blockRowCount = 0;
|
||||
int stripeRowCount = 0;
|
||||
CompressionType compression = COMPRESSION_TYPE_INVALID;
|
||||
|
||||
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
if (!metadata)
|
||||
{
|
||||
ereport(ERROR, (errmsg("table %s is not a cstore table",
|
||||
quote_identifier(RelationGetRelationName(rel)))));
|
||||
}
|
||||
|
||||
blockRowCount = metadata->blockRowCount;
|
||||
stripeRowCount = metadata->stripeRowCount;
|
||||
compression = metadata->compression;
|
||||
|
||||
/* block_row_count => true */
|
||||
if (!PG_ARGISNULL(1) && PG_GETARG_BOOL(1))
|
||||
{
|
||||
blockRowCount = cstore_block_row_count;
|
||||
ereport(DEBUG1, (errmsg("resetting block row count to %d", blockRowCount)));
|
||||
}
|
||||
|
||||
/* stripe_row_count => true */
|
||||
if (!PG_ARGISNULL(2) && PG_GETARG_BOOL(2))
|
||||
{
|
||||
stripeRowCount = cstore_stripe_row_count;
|
||||
ereport(DEBUG1, (errmsg("resetting stripe row count to %d", stripeRowCount)));
|
||||
}
|
||||
|
||||
/* compression => true */
|
||||
if (!PG_ARGISNULL(3) && PG_GETARG_BOOL(3))
|
||||
{
|
||||
compression = cstore_compression;
|
||||
ereport(DEBUG1, (errmsg("resetting compression to %s",
|
||||
CompressionTypeStr(compression))));
|
||||
}
|
||||
|
||||
UpdateCStoreDataFileMetadata(rel->rd_node.relNode, blockRowCount, stripeRowCount,
|
||||
compression);
|
||||
|
||||
table_close(rel, NoLock);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
CREATE SCHEMA am_tableoptions;
|
||||
SET search_path TO am_tableoptions;
|
||||
CREATE TABLE table_options (a int) USING cstore_tableam;
|
||||
INSERT INTO table_options SELECT generate_series(1,100);
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10000 | 150000 | none
|
||||
(1 row)
|
||||
|
||||
-- test changing the compression
|
||||
SELECT alter_cstore_table_set('table_options', compression => 'pglz');
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10000 | 150000 | pglz
|
||||
(1 row)
|
||||
|
||||
-- test changing the block_row_count
|
||||
SELECT alter_cstore_table_set('table_options', block_row_count => 10);
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10 | 150000 | pglz
|
||||
(1 row)
|
||||
|
||||
-- test changing the block_row_count
|
||||
SELECT alter_cstore_table_set('table_options', stripe_row_count => 100);
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10 | 100 | pglz
|
||||
(1 row)
|
||||
|
||||
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
|
||||
VACUUM FULL table_options;
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10 | 100 | pglz
|
||||
(1 row)
|
||||
|
||||
-- set all settings at the same time
|
||||
SELECT alter_cstore_table_set('table_options', stripe_row_count => 1000, block_row_count => 100, compression => 'none');
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 100 | 1000 | none
|
||||
(1 row)
|
||||
|
||||
-- reset settings one by one to the version of the GUC's
|
||||
SET cstore.block_row_count TO 1000;
|
||||
SET cstore.stripe_row_count TO 10000;
|
||||
SET cstore.compression TO 'pglz';
|
||||
-- verify setting the GUC's didn't change the settings
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 100 | 1000 | none
|
||||
(1 row)
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', block_row_count => true);
|
||||
alter_cstore_table_reset
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 1000 | 1000 | none
|
||||
(1 row)
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', stripe_row_count => true);
|
||||
alter_cstore_table_reset
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 1000 | 10000 | none
|
||||
(1 row)
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', compression => true);
|
||||
alter_cstore_table_reset
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 1000 | 10000 | pglz
|
||||
(1 row)
|
||||
|
||||
-- verify resetting all settings at once work
|
||||
SET cstore.block_row_count TO 10000;
|
||||
SET cstore.stripe_row_count TO 100000;
|
||||
SET cstore.compression TO 'none';
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 1000 | 10000 | pglz
|
||||
(1 row)
|
||||
|
||||
SELECT alter_cstore_table_reset(
|
||||
'table_options',
|
||||
block_row_count => true,
|
||||
stripe_row_count => true,
|
||||
compression => true);
|
||||
alter_cstore_table_reset
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
regclass | block_row_count | stripe_row_count | compression
|
||||
---------------+-----------------+------------------+-------------
|
||||
table_options | 10000 | 100000 | none
|
||||
(1 row)
|
||||
|
||||
-- verify edge cases
|
||||
-- first start with a table that is not a cstore table
|
||||
CREATE TABLE not_a_cstore_table (a int);
|
||||
SELECT alter_cstore_table_set('not_a_cstore_table', compression => 'pglz');
|
||||
ERROR: table not_a_cstore_table is not a cstore table
|
||||
SELECT alter_cstore_table_reset('not_a_cstore_table', compression => true);
|
||||
ERROR: table not_a_cstore_table is not a cstore table
|
||||
-- verify you can't use a compression that is not known
|
||||
SELECT alter_cstore_table_set('table_options', compression => 'foobar');
|
||||
ERROR: unknown compression type for cstore table: foobar
|
||||
SET client_min_messages TO warning;
|
||||
DROP SCHEMA am_tableoptions CASCADE;
|
|
@ -36,7 +36,12 @@ SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.r
|
|||
(1 row)
|
||||
|
||||
-- test the case when all data cannot fit into a single stripe
|
||||
SET cstore.stripe_row_count TO 1000;
|
||||
SELECT alter_cstore_table_set('t', stripe_row_count => 1000);
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i;
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
sum | sum
|
||||
|
@ -157,14 +162,25 @@ SELECT count(*) FROM t;
|
|||
-- add some stripes with different compression types and create some gaps,
|
||||
-- then vacuum to print stats
|
||||
BEGIN;
|
||||
SET cstore.block_row_count TO 1000;
|
||||
SET cstore.stripe_row_count TO 2000;
|
||||
SET cstore.compression TO "pglz";
|
||||
SELECT alter_cstore_table_set('t',
|
||||
block_row_count => 1000,
|
||||
stripe_row_count => 2000,
|
||||
compression => 'pglz');
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SAVEPOINT s1;
|
||||
INSERT INTO t SELECT i FROM generate_series(1, 1500) i;
|
||||
ROLLBACK TO SAVEPOINT s1;
|
||||
INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i;
|
||||
SET cstore.compression TO "none";
|
||||
SELECT alter_cstore_table_set('t', compression => 'none');
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SAVEPOINT s2;
|
||||
INSERT INTO t SELECT i FROM generate_series(1, 1500) i;
|
||||
ROLLBACK TO SAVEPOINT s2;
|
||||
|
@ -195,7 +211,12 @@ block count: 11, containing data for dropped columns: 2, none compressed: 9, pgl
|
|||
-- vacuum full should remove blocks for dropped columns
|
||||
-- note that, a block will be stored in non-compressed for if compression
|
||||
-- doesn't reduce its size.
|
||||
SET cstore.compression TO "pglz";
|
||||
SELECT alter_cstore_table_set('t', compression => 'pglz');
|
||||
alter_cstore_table_set
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
VACUUM FULL t;
|
||||
VACUUM VERBOSE t;
|
||||
INFO: statistics for "t":
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
CREATE SCHEMA am_tableoptions;
|
||||
SET search_path TO am_tableoptions;
|
||||
|
||||
CREATE TABLE table_options (a int) USING cstore_tableam;
|
||||
INSERT INTO table_options SELECT generate_series(1,100);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- test changing the compression
|
||||
SELECT alter_cstore_table_set('table_options', compression => 'pglz');
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- test changing the block_row_count
|
||||
SELECT alter_cstore_table_set('table_options', block_row_count => 10);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- test changing the block_row_count
|
||||
SELECT alter_cstore_table_set('table_options', stripe_row_count => 100);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
|
||||
VACUUM FULL table_options;
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- set all settings at the same time
|
||||
SELECT alter_cstore_table_set('table_options', stripe_row_count => 1000, block_row_count => 100, compression => 'none');
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- reset settings one by one to the version of the GUC's
|
||||
SET cstore.block_row_count TO 1000;
|
||||
SET cstore.stripe_row_count TO 10000;
|
||||
SET cstore.compression TO 'pglz';
|
||||
|
||||
-- verify setting the GUC's didn't change the settings
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', block_row_count => true);
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', stripe_row_count => true);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
SELECT alter_cstore_table_reset('table_options', compression => true);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- verify resetting all settings at once work
|
||||
SET cstore.block_row_count TO 10000;
|
||||
SET cstore.stripe_row_count TO 100000;
|
||||
SET cstore.compression TO 'none';
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
SELECT alter_cstore_table_reset(
|
||||
'table_options',
|
||||
block_row_count => true,
|
||||
stripe_row_count => true,
|
||||
compression => true);
|
||||
|
||||
-- show table_options settings
|
||||
SELECT * FROM cstore.cstore_options
|
||||
WHERE regclass = 'table_options'::regclass;
|
||||
|
||||
-- verify edge cases
|
||||
-- first start with a table that is not a cstore table
|
||||
CREATE TABLE not_a_cstore_table (a int);
|
||||
SELECT alter_cstore_table_set('not_a_cstore_table', compression => 'pglz');
|
||||
SELECT alter_cstore_table_reset('not_a_cstore_table', compression => true);
|
||||
|
||||
-- verify you can't use a compression that is not known
|
||||
SELECT alter_cstore_table_set('table_options', compression => 'foobar');
|
||||
|
||||
SET client_min_messages TO warning;
|
||||
DROP SCHEMA am_tableoptions CASCADE;
|
|
@ -18,7 +18,7 @@ SELECT sum(a), sum(b) FROM t;
|
|||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
-- test the case when all data cannot fit into a single stripe
|
||||
SET cstore.stripe_row_count TO 1000;
|
||||
SELECT alter_cstore_table_set('t', stripe_row_count => 1000);
|
||||
INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i;
|
||||
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
|
@ -65,14 +65,15 @@ SELECT count(*) FROM t;
|
|||
-- then vacuum to print stats
|
||||
|
||||
BEGIN;
|
||||
SET cstore.block_row_count TO 1000;
|
||||
SET cstore.stripe_row_count TO 2000;
|
||||
SET cstore.compression TO "pglz";
|
||||
SELECT alter_cstore_table_set('t',
|
||||
block_row_count => 1000,
|
||||
stripe_row_count => 2000,
|
||||
compression => 'pglz');
|
||||
SAVEPOINT s1;
|
||||
INSERT INTO t SELECT i FROM generate_series(1, 1500) i;
|
||||
ROLLBACK TO SAVEPOINT s1;
|
||||
INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i;
|
||||
SET cstore.compression TO "none";
|
||||
SELECT alter_cstore_table_set('t', compression => 'none');
|
||||
SAVEPOINT s2;
|
||||
INSERT INTO t SELECT i FROM generate_series(1, 1500) i;
|
||||
ROLLBACK TO SAVEPOINT s2;
|
||||
|
@ -93,7 +94,7 @@ VACUUM VERBOSE t;
|
|||
-- vacuum full should remove blocks for dropped columns
|
||||
-- note that, a block will be stored in non-compressed for if compression
|
||||
-- doesn't reduce its size.
|
||||
SET cstore.compression TO "pglz";
|
||||
SELECT alter_cstore_table_set('t', compression => 'pglz');
|
||||
VACUUM FULL t;
|
||||
VACUUM VERBOSE t;
|
||||
|
||||
|
|
Loading…
Reference in New Issue