refactor options to their own table linked to the regclass (#4346)

Columnar options were by accident linked to the relfilenode instead of the regclass/relation oid. This PR moves everything related to columnar options to their own catalog table.
pull/4353/head
Nils Dijk 2020-11-27 20:22:08 +01:00 committed by GitHub
parent af02ac6cf5
commit 383e334023
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 483 additions and 270 deletions

View File

@ -55,6 +55,8 @@ static void GetHighestUsedAddressAndId(Oid relfilenode,
static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot);
static Oid CStoreStripesRelationId(void);
static Oid CStoreStripesIndexRelationId(void);
static Oid ColumnarOptionsRelationId(void);
static Oid ColumnarOptionsIndexRegclass(void);
static Oid CStoreDataFilesRelationId(void);
static Oid CStoreDataFilesIndexRelationId(void);
static Oid CStoreSkipNodesRelationId(void);
@ -70,14 +72,36 @@ static EState * create_estate_for_relation(Relation rel);
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
/* constants for cstore.options */
#define Natts_cstore_options 4
#define Anum_cstore_options_regclass 1
#define Anum_cstore_options_block_row_count 2
#define Anum_cstore_options_stripe_row_count 3
#define Anum_cstore_options_compression 4
/* ----------------
* cstore.options definition.
* ----------------
*/
typedef struct FormData_cstore_options
{
Oid regclass;
int32 block_row_count;
int32 stripe_row_count;
NameData compression;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
} FormData_cstore_options;
typedef FormData_cstore_options *Form_cstore_options;
/* constants for cstore_table */
#define Natts_cstore_data_files 6
#define Natts_cstore_data_files 3
#define Anum_cstore_data_files_relfilenode 1
#define Anum_cstore_data_files_block_row_count 2
#define Anum_cstore_data_files_stripe_row_count 3
#define Anum_cstore_data_files_compression 4
#define Anum_cstore_data_files_version_major 5
#define Anum_cstore_data_files_version_minor 6
#define Anum_cstore_data_files_version_major 2
#define Anum_cstore_data_files_version_minor 3
/* ----------------
* cstore.cstore_data_files definition.
@ -124,29 +148,229 @@ typedef FormData_cstore_data_files *Form_cstore_data_files;
#define Anum_cstore_skipnodes_value_compression_type 12
/*
* InitColumnarOptions initialized the columnar table options. Meaning it writes the
* default options to the options table if not already existing.
*
* The return value indicates if options have actually been written.
*/
bool
InitColumnarOptions(Oid regclass)
{
ColumnarOptions defaultOptions = {
.blockRowCount = cstore_block_row_count,
.stripeRowCount = cstore_stripe_row_count,
.compressionType = cstore_compression,
};
return WriteColumnarOptions(regclass, &defaultOptions, false);
}
/*
* SetColumnarOptions writes the passed table options as the authoritive options to the
* table irregardless of the optiones already existing or not. This can be used to put a
* table in a certain state.
*/
void
SetColumnarOptions(Oid regclass, ColumnarOptions *options)
{
WriteColumnarOptions(regclass, options, true);
}
/*
* WriteColumnarOptions writes the options to the catalog table for a given regclass.
* - If overwrite is false it will only write the values if there is not already a record
* found.
* - If overwrite is true it will always write the settings
*
* The return value indicates if the record has been written.
*/
static bool
WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
{
bool written = false;
bool nulls[Natts_cstore_options] = { 0 };
Datum values[Natts_cstore_options] = {
ObjectIdGetDatum(regclass),
Int32GetDatum(options->blockRowCount),
Int32GetDatum(options->stripeRowCount),
0, /* to be filled below */
};
NameData compressionName = { 0 };
namestrcpy(&compressionName, CompressionTypeStr(options->compressionType));
values[Anum_cstore_options_compression - 1] = NameGetDatum(&compressionName);
/* create heap tuple and insert into catalog table */
Relation columnarOptions = relation_open(ColumnarOptionsRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(columnarOptions);
/* find existing item to perform update if exist */
ScanKeyData scanKey[1] = { 0 };
ScanKeyInit(&scanKey[0], Anum_cstore_options_regclass, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(regclass));
Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
1, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
if (overwrite)
{
/* TODO check if the options are actually different, skip if not changed */
/* update existing record */
bool update[Natts_cstore_options] = { 0 };
update[Anum_cstore_options_block_row_count - 1] = true;
update[Anum_cstore_options_stripe_row_count - 1] = true;
update[Anum_cstore_options_compression - 1] = true;
HeapTuple tuple = heap_modify_tuple(heapTuple, tupleDescriptor,
values, nulls, update);
CatalogTupleUpdate(columnarOptions, &tuple->t_self, tuple);
written = true;
}
}
else
{
/* inserting new record */
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
CatalogTupleInsert(columnarOptions, newTuple);
written = true;
}
if (written)
{
CommandCounterIncrement();
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
relation_close(columnarOptions, NoLock);
return written;
}
/*
* DeleteColumnarTableOptions removes the columnar table options for a regclass. When
* missingOk is false it will throw an error when no table options can be found.
*
* Returns whether a record has been removed.
*/
bool
DeleteColumnarTableOptions(Oid regclass, bool missingOk)
{
bool result = false;
Relation columnarOptions = relation_open(ColumnarOptionsRelationId(),
RowExclusiveLock);
/* find existing item to remove */
ScanKeyData scanKey[1] = { 0 };
ScanKeyInit(&scanKey[0], Anum_cstore_options_regclass, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(regclass));
Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
1, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
CatalogTupleDelete(columnarOptions, &heapTuple->t_self);
CommandCounterIncrement();
result = true;
}
else if (!missingOk)
{
ereport(ERROR, (errmsg("missing options for regclass: %d", regclass)));
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
relation_close(columnarOptions, NoLock);
return result;
}
bool
ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
{
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_cstore_options_regclass, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(regclass));
Oid columnarOptionsOid = ColumnarOptionsRelationId();
Relation columnarOptions = try_relation_open(columnarOptionsOid, AccessShareLock);
if (columnarOptions == NULL)
{
/*
* Extension has been dropped. This can be called while
* dropping extension or database via ObjectAccess().
*/
return false;
}
Relation index = try_relation_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
if (index == NULL)
{
heap_close(columnarOptions, NoLock);
/* extension has been dropped */
return false;
}
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
1, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
Form_cstore_options tupOptions = (Form_cstore_options) GETSTRUCT(heapTuple);
options->blockRowCount = tupOptions->block_row_count;
options->stripeRowCount = tupOptions->stripe_row_count;
options->compressionType = ParseCompressionType(NameStr(tupOptions->compression));
}
else
{
/* populate options with system defaults */
options->compressionType = cstore_compression;
options->stripeRowCount = cstore_stripe_row_count;
options->blockRowCount = cstore_block_row_count;
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
relation_close(columnarOptions, NoLock);
return true;
}
/*
* InitCStoreDataFileMetadata adds a record for the given relfilenode
* in cstore_data_files.
*/
void
InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCount,
CompressionType compression)
InitCStoreDataFileMetadata(Oid relfilenode)
{
NameData compressionName = { 0 };
bool nulls[Natts_cstore_data_files] = { 0 };
Datum values[Natts_cstore_data_files] = {
ObjectIdGetDatum(relfilenode),
Int32GetDatum(blockRowCount),
Int32GetDatum(stripeRowCount),
0, /* to be filled below */
Int32GetDatum(CSTORE_VERSION_MAJOR),
Int32GetDatum(CSTORE_VERSION_MINOR)
};
namestrcpy(&compressionName, CompressionTypeStr(compression));
values[Anum_cstore_data_files_compression - 1] = NameGetDatum(&compressionName);
DeleteDataFileMetadataRowIfExists(relfilenode);
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
@ -162,81 +386,6 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCoun
}
void
UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCount,
CompressionType compression)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Datum values[Natts_cstore_data_files] = { 0 };
bool isnull[Natts_cstore_data_files] = { 0 };
bool replace[Natts_cstore_data_files] = { 0 };
bool changed = false;
Relation cstoreDataFiles = heap_open(CStoreDataFilesRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode, BTEqualStrategyNumber,
F_INT8EQ, ObjectIdGetDatum(relfilenode));
SysScanDesc scanDescriptor = systable_beginscan(cstoreDataFiles,
CStoreDataFilesIndexRelationId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (heapTuple == NULL)
{
ereport(ERROR, (errmsg("relfilenode %d doesn't belong to a cstore table",
relfilenode)));
}
Form_cstore_data_files metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
if (metadata->block_row_count != blockRowCount)
{
values[Anum_cstore_data_files_block_row_count - 1] = Int32GetDatum(blockRowCount);
isnull[Anum_cstore_data_files_block_row_count - 1] = false;
replace[Anum_cstore_data_files_block_row_count - 1] = true;
changed = true;
}
if (metadata->stripe_row_count != stripeRowCount)
{
values[Anum_cstore_data_files_stripe_row_count - 1] = Int32GetDatum(
stripeRowCount);
isnull[Anum_cstore_data_files_stripe_row_count - 1] = false;
replace[Anum_cstore_data_files_stripe_row_count - 1] = true;
changed = true;
}
if (ParseCompressionType(NameStr(metadata->compression)) != compression)
{
Name compressionName = palloc0(sizeof(NameData));
namestrcpy(compressionName, CompressionTypeStr(compression));
values[Anum_cstore_data_files_compression - 1] = NameGetDatum(compressionName);
isnull[Anum_cstore_data_files_compression - 1] = false;
replace[Anum_cstore_data_files_compression - 1] = true;
changed = true;
}
if (changed)
{
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(cstoreDataFiles, &heapTuple->t_self, heapTuple);
CommandCounterIncrement();
}
systable_endscan(scanDescriptor);
heap_close(cstoreDataFiles, NoLock);
}
/*
* SaveStripeSkipList saves StripeSkipList for a given stripe as rows
* of cstore_skipnodes.
@ -662,28 +811,12 @@ ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
return false;
}
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
SysScanDesc scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL,
1, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
Datum datumArray[Natts_cstore_data_files];
bool isNullArray[Natts_cstore_data_files];
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
if (metadata)
{
metadata->blockRowCount = DatumGetInt32(
datumArray[Anum_cstore_data_files_block_row_count - 1]);
metadata->stripeRowCount = DatumGetInt32(
datumArray[Anum_cstore_data_files_stripe_row_count - 1]);
Name compressionName = DatumGetName(
datumArray[Anum_cstore_data_files_compression - 1]);
metadata->compression = ParseCompressionType(NameStr(*compressionName));
}
found = true;
}
@ -943,6 +1076,26 @@ CStoreStripesIndexRelationId(void)
}
/*
* ColumnarOptionsRelationId returns relation id of cstore.options.
*/
static Oid
ColumnarOptionsRelationId(void)
{
return get_relname_relid("options", CStoreNamespaceId());
}
/*
* ColumnarOptionsIndexRegclass returns relation id of cstore.options_pkey.
*/
static Oid
ColumnarOptionsIndexRegclass(void)
{
return get_relname_relid("options_pkey", CStoreNamespaceId());
}
/*
* CStoreDataFilesRelationId returns relation id of cstore_data_files.
* TODO: should we cache this similar to citus?

View File

@ -113,39 +113,6 @@ static void LogRelationStats(Relation rel, int elevel);
static void TruncateCStore(Relation rel, int elevel);
/*
* CStoreTableAMDefaultOptions returns the default options for a cstore table am table.
* These options are based on the GUC's controlling the defaults.
*/
static CStoreOptions *
CStoreTableAMDefaultOptions()
{
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
cstoreOptions->compressionType = cstore_compression;
cstoreOptions->stripeRowCount = cstore_stripe_row_count;
cstoreOptions->blockRowCount = cstore_block_row_count;
return cstoreOptions;
}
/*
* CStoreTableAMGetOptions returns the options based on a relation. It is advised the
* relation is a cstore table am table, if not it will raise an error
*/
CStoreOptions *
CStoreTableAMGetOptions(Oid relfilenode)
{
Assert(OidIsValid(relfilenode));
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
cstoreOptions->compressionType = metadata->compression;
cstoreOptions->stripeRowCount = metadata->stripeRowCount;
cstoreOptions->blockRowCount = metadata->blockRowCount;
return cstoreOptions;
}
static List *
RelationColumnList(Relation rel)
{
@ -450,7 +417,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
* cstore_init_write_state allocates the write state in a longer
* lasting context, so no need to worry about it.
*/
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
TableWriteState *writeState = cstore_init_write_state(relation,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
@ -496,7 +463,7 @@ static void
cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CommandId cid, int options, BulkInsertState bistate)
{
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
TableWriteState *writeState = cstore_init_write_state(relation,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
@ -570,30 +537,10 @@ cstore_relation_set_new_filenode(Relation rel,
TransactionId *freezeXid,
MultiXactId *minmulti)
{
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
uint64 blockRowCount = 0;
uint64 stripeRowCount = 0;
CompressionType compression = 0;
Oid oldRelfilenode = rel->rd_node.relNode;
MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId());
if (metadata != NULL)
{
/* existing table (e.g. TRUNCATE), use existing blockRowCount */
blockRowCount = metadata->blockRowCount;
stripeRowCount = metadata->stripeRowCount;
compression = metadata->compression;
}
else
{
/* new table, use options */
CStoreOptions *options = CStoreTableAMDefaultOptions();
blockRowCount = options->blockRowCount;
stripeRowCount = options->stripeRowCount;
compression = options->compressionType;
}
/* delete old relfilenode metadata */
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
@ -601,8 +548,10 @@ cstore_relation_set_new_filenode(Relation rel,
*freezeXid = RecentXmin;
*minmulti = GetOldestMultiXactId();
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence);
InitCStoreDataFileMetadata(newrnode->relNode, blockRowCount, stripeRowCount,
compression);
InitCStoreDataFileMetadata(newrnode->relNode);
InitColumnarOptions(rel->rd_id);
smgrclose(srel);
}
@ -611,7 +560,6 @@ static void
cstore_relation_nontransactional_truncate(Relation rel)
{
Oid relfilenode = rel->rd_node.relNode;
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
NonTransactionDropWriteState(relfilenode);
@ -626,8 +574,7 @@ cstore_relation_nontransactional_truncate(Relation rel)
/* Delete old relfilenode metadata and recreate it */
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
InitCStoreDataFileMetadata(rel->rd_node.relNode, metadata->blockRowCount,
metadata->stripeRowCount, metadata->compression);
InitCStoreDataFileMetadata(rel->rd_node.relNode);
}
@ -670,24 +617,14 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
*/
Assert(sourceDesc->natts == targetDesc->natts);
/*
* Since we are copying into a new relation we need to copy the settings from the old
* relation first.
*/
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap->rd_node.relNode);
UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode,
cstoreOptions->blockRowCount,
cstoreOptions->stripeRowCount,
cstoreOptions->compressionType);
cstoreOptions = CStoreTableAMGetOptions(NewHeap->rd_node.relNode);
/* read settings from old heap, relfilenode will be swapped at the end */
ColumnarOptions cstoreOptions = { 0 };
ReadColumnarOptions(OldHeap->rd_id, &cstoreOptions);
TableWriteState *writeState = CStoreBeginWrite(NewHeap->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
cstoreOptions.compressionType,
cstoreOptions.stripeRowCount,
cstoreOptions.blockRowCount,
targetDesc);
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc,
@ -1236,6 +1173,7 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
Relation rel = table_open(objectId, AccessExclusiveLock);
Oid relfilenode = rel->rd_node.relNode;
DeleteDataFileMetadataRowIfExists(relfilenode);
DeleteColumnarTableOptions(rel->rd_id, true);
MarkRelfilenodeDropped(relfilenode, GetCurrentSubTransactionId());
@ -1367,47 +1305,50 @@ alter_columnar_table_set(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
if (!metadata)
if (!IsCStoreTableAmTable(relationId))
{
ereport(ERROR, (errmsg("table %s is not a cstore table",
quote_identifier(RelationGetRelationName(rel)))));
}
int blockRowCount = metadata->blockRowCount;
int stripeRowCount = metadata->stripeRowCount;
CompressionType compression = metadata->compression;
ColumnarOptions options = { 0 };
if (!ReadColumnarOptions(relationId, &options))
{
ereport(ERROR, (errmsg("unable to read current options for table")));
}
/* block_row_count => not null */
if (!PG_ARGISNULL(1))
{
blockRowCount = PG_GETARG_INT32(1);
ereport(DEBUG1, (errmsg("updating block row count to %d", blockRowCount)));
options.blockRowCount = PG_GETARG_INT32(1);
ereport(DEBUG1,
(errmsg("updating block row count to %d", options.blockRowCount)));
}
/* stripe_row_count => not null */
if (!PG_ARGISNULL(2))
{
stripeRowCount = PG_GETARG_INT32(2);
ereport(DEBUG1, (errmsg("updating stripe row count to %d", stripeRowCount)));
options.stripeRowCount = PG_GETARG_INT32(2);
ereport(DEBUG1, (errmsg(
"updating stripe row count to " UINT64_FORMAT,
options.stripeRowCount)));
}
/* compression => not null */
if (!PG_ARGISNULL(3))
{
Name compressionName = PG_GETARG_NAME(3);
compression = ParseCompressionType(NameStr(*compressionName));
if (compression == COMPRESSION_TYPE_INVALID)
options.compressionType = ParseCompressionType(NameStr(*compressionName));
if (options.compressionType == COMPRESSION_TYPE_INVALID)
{
ereport(ERROR, (errmsg("unknown compression type for cstore table: %s",
quote_identifier(NameStr(*compressionName)))));
}
ereport(DEBUG1, (errmsg("updating compression to %s",
CompressionTypeStr(compression))));
CompressionTypeStr(options.compressionType))));
}
UpdateCStoreDataFileMetadata(rel->rd_node.relNode, blockRowCount, stripeRowCount,
compression);
SetColumnarOptions(relationId, &options);
table_close(rel, NoLock);
@ -1422,41 +1363,44 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
if (!metadata)
if (!IsCStoreTableAmTable(relationId))
{
ereport(ERROR, (errmsg("table %s is not a cstore table",
quote_identifier(RelationGetRelationName(rel)))));
}
int blockRowCount = metadata->blockRowCount;
int stripeRowCount = metadata->stripeRowCount;
CompressionType compression = metadata->compression;
ColumnarOptions options = { 0 };
if (!ReadColumnarOptions(relationId, &options))
{
ereport(ERROR, (errmsg("unable to read current options for table")));
}
/* block_row_count => true */
if (!PG_ARGISNULL(1) && PG_GETARG_BOOL(1))
{
blockRowCount = cstore_block_row_count;
ereport(DEBUG1, (errmsg("resetting block row count to %d", blockRowCount)));
options.blockRowCount = cstore_block_row_count;
ereport(DEBUG1,
(errmsg("resetting block row count to %d", options.blockRowCount)));
}
/* stripe_row_count => true */
if (!PG_ARGISNULL(2) && PG_GETARG_BOOL(2))
{
stripeRowCount = cstore_stripe_row_count;
ereport(DEBUG1, (errmsg("resetting stripe row count to %d", stripeRowCount)));
options.stripeRowCount = cstore_stripe_row_count;
ereport(DEBUG1,
(errmsg("resetting stripe row count to " UINT64_FORMAT,
options.stripeRowCount)));
}
/* compression => true */
if (!PG_ARGISNULL(3) && PG_GETARG_BOOL(3))
{
compression = cstore_compression;
options.compressionType = cstore_compression;
ereport(DEBUG1, (errmsg("resetting compression to %s",
CompressionTypeStr(compression))));
CompressionTypeStr(options.compressionType))));
}
UpdateCStoreDataFileMetadata(rel->rd_node.relNode, blockRowCount, stripeRowCount,
compression);
SetColumnarOptions(relationId, &options);
table_close(rel, NoLock);

View File

@ -3,11 +3,17 @@
CREATE SCHEMA cstore;
SET search_path TO cstore;
CREATE TABLE cstore_data_files (
relfilenode oid NOT NULL,
CREATE TABLE options (
regclass regclass NOT NULL PRIMARY KEY,
block_row_count int NOT NULL,
stripe_row_count int NOT NULL,
compression name NOT NULL,
compression name NOT NULL
);
COMMENT ON TABLE options IS 'columnar table specific options, maintained by alter_columnar_table_set';
CREATE TABLE cstore_data_files (
relfilenode oid NOT NULL,
version_major bigint NOT NULL,
version_minor bigint NOT NULL,
PRIMARY KEY (relfilenode)
@ -49,16 +55,6 @@ CREATE TABLE cstore_skipnodes (
COMMENT ON TABLE cstore_skipnodes IS 'CStore per block metadata';
CREATE VIEW columnar_options AS
SELECT c.oid::regclass regclass,
d.block_row_count,
d.stripe_row_count,
d.compression
FROM pg_class c
JOIN cstore.cstore_data_files d USING(relfilenode);
COMMENT ON VIEW columnar_options IS 'CStore per table settings';
DO $proc$
BEGIN

View File

@ -27,10 +27,10 @@ IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN
END IF;
END$proc$;
DROP VIEW columnar_options;
DROP TABLE cstore_skipnodes;
DROP TABLE cstore_stripes;
DROP TABLE cstore_data_files;
DROP TABLE options;
DROP FUNCTION citus_internal.cstore_ensure_objects_exist();

View File

@ -116,7 +116,7 @@ CleanupWriteStateMap(void *arg)
TableWriteState *
cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc,
cstore_init_write_state(Relation relation, TupleDesc tupdesc,
SubTransactionId currentSubXid)
{
bool found;
@ -148,7 +148,7 @@ cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc,
MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback);
}
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relfilenode.relNode,
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relation->rd_node.relNode,
HASH_ENTER, &found);
if (!found)
{
@ -178,12 +178,14 @@ cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc,
*/
MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext);
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relfilenode.relNode);
ColumnarOptions cstoreOptions = { 0 };
ReadColumnarOptions(relation->rd_id, &cstoreOptions);
SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState));
stackEntry->writeState = CStoreBeginWrite(relfilenode,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
stackEntry->writeState = CStoreBeginWrite(relation->rd_node,
cstoreOptions.compressionType,
cstoreOptions.stripeRowCount,
cstoreOptions.blockRowCount,
tupdesc);
stackEntry->subXid = currentSubXid;
stackEntry->next = hashEntry->writeStateStack;

View File

@ -66,12 +66,12 @@ typedef enum
* a cstore file. To resolve these values, we first check foreign table's options,
* and if not present, we then fall back to the default values specified above.
*/
typedef struct CStoreOptions
typedef struct ColumnarOptions
{
CompressionType compressionType;
uint64 stripeRowCount;
uint32 blockRowCount;
} CStoreOptions;
CompressionType compressionType;
} ColumnarOptions;
/*
@ -94,9 +94,7 @@ typedef struct StripeMetadata
typedef struct DataFileMetadata
{
List *stripeMetadataList;
uint64 blockRowCount;
uint64 stripeRowCount;
CompressionType compression;
ColumnarOptions options;
} DataFileMetadata;
@ -285,12 +283,14 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
extern char * CompressionTypeStr(CompressionType type);
extern CStoreOptions * CStoreTableAMGetOptions(Oid relfilenode);
/* cstore_metadata_tables.c */
extern bool InitColumnarOptions(Oid regclass);
extern void SetColumnarOptions(Oid regclass, ColumnarOptions *options);
extern bool DeleteColumnarTableOptions(Oid regclass, bool missingOk);
extern bool ReadColumnarOptions(Oid regclass, ColumnarOptions *options);
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int
stripeRowCount, CompressionType compression);
extern void InitCStoreDataFileMetadata(Oid relfilenode);
extern void UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int
stripeRowCount, CompressionType compression);
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk);
@ -307,7 +307,7 @@ extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe,
/* write_state_management.c */
extern TableWriteState * cstore_init_write_state(RelFileNode relfilenode, TupleDesc
extern TableWriteState * cstore_init_write_state(Relation relation, TupleDesc
tupdesc,
SubTransactionId currentSubXid);
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId

View File

@ -22,7 +22,37 @@ SELECT * FROM t_view a ORDER BY a;
2 | 54 | 3
(3 rows)
-- show columnar options for materialized view
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
t_view | 10000 | 150000 | none
(1 row)
-- show we can set options on a materialized view
SELECT alter_columnar_table_set('t_view', compression => 'pglz');
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
t_view | 10000 | 150000 | pglz
(1 row)
REFRESH MATERIALIZED VIEW t_view;
-- verify options have not been changed
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
t_view | 10000 | 150000 | pglz
(1 row)
SELECT * FROM t_view a ORDER BY a;
a | bsum | cnt
---------------------------------------------------------------------

View File

@ -3,7 +3,7 @@ SET search_path TO am_tableoptions;
CREATE TABLE table_options (a int) USING columnar;
INSERT INTO table_options SELECT generate_series(1,100);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -18,7 +18,7 @@ SELECT alter_columnar_table_set('table_options', compression => 'pglz');
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -33,7 +33,7 @@ SELECT alter_columnar_table_set('table_options', block_row_count => 10);
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -48,7 +48,7 @@ SELECT alter_columnar_table_set('table_options', stripe_row_count => 100);
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -58,7 +58,7 @@ WHERE regclass = 'table_options'::regclass;
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -73,7 +73,46 @@ SELECT alter_columnar_table_set('table_options', stripe_row_count => 1000, block
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
table_options | 100 | 1000 | none
(1 row)
-- make sure table options are not changed when VACUUM a table
VACUUM table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
table_options | 100 | 1000 | none
(1 row)
-- make sure table options are not changed when VACUUM FULL a table
VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
table_options | 100 | 1000 | none
(1 row)
-- make sure table options are not changed when truncating a table
TRUNCATE table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
table_options | 100 | 1000 | none
(1 row)
ALTER TABLE table_options ALTER COLUMN a TYPE bigint;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -86,7 +125,7 @@ SET cstore.stripe_row_count TO 10000;
SET cstore.compression TO 'pglz';
-- verify setting the GUC's didn't change the settings
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -100,7 +139,7 @@ SELECT alter_columnar_table_reset('table_options', block_row_count => true);
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -114,7 +153,7 @@ SELECT alter_columnar_table_reset('table_options', stripe_row_count => true);
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -128,7 +167,7 @@ SELECT alter_columnar_table_reset('table_options', compression => true);
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -140,7 +179,7 @@ SET cstore.block_row_count TO 10000;
SET cstore.stripe_row_count TO 100000;
SET cstore.compression TO 'none';
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -158,7 +197,7 @@ SELECT alter_columnar_table_reset(
(1 row)
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
@ -175,5 +214,13 @@ ERROR: table not_a_cstore_table is not a cstore table
-- verify you can't use a compression that is not known
SELECT alter_columnar_table_set('table_options', compression => 'foobar');
ERROR: unknown compression type for cstore table: foobar
-- verify options are removed when table is dropped
DROP TABLE table_options;
-- we expect no entries in çstore.options for anything not found int pg_class
SELECT * FROM cstore.options o WHERE o.regclass NOT IN (SELECT oid FROM pg_class);
regclass | block_row_count | stripe_row_count | compression
---------------------------------------------------------------------
(0 rows)
SET client_min_messages TO warning;
DROP SCHEMA am_tableoptions CASCADE;

View File

@ -486,7 +486,7 @@ SELECT * FROM print_extension_changes();
| table cstore.cstore_data_files
| table cstore.cstore_skipnodes
| table cstore.cstore_stripes
| view cstore.columnar_options
| table cstore.options
(10 rows)
DROP TABLE prev_objects, extension_diff;

View File

@ -482,7 +482,7 @@ SELECT * FROM print_extension_changes();
| table cstore.cstore_data_files
| table cstore.cstore_skipnodes
| table cstore.cstore_stripes
| view cstore.columnar_options
| table cstore.options
(6 rows)
DROP TABLE prev_objects, extension_diff;

View File

@ -194,6 +194,7 @@ ORDER BY 1;
table cstore.cstore_data_files
table cstore.cstore_skipnodes
table cstore.cstore_stripes
table cstore.options
table pg_dist_authinfo
table pg_dist_colocation
table pg_dist_local_group
@ -215,7 +216,6 @@ ORDER BY 1;
view citus_shards_on_worker
view citus_stat_statements
view citus_worker_stat_activity
view cstore.columnar_options
view pg_dist_shard_placement
(201 rows)

View File

@ -190,6 +190,7 @@ ORDER BY 1;
table cstore.cstore_data_files
table cstore.cstore_skipnodes
table cstore.cstore_stripes
table cstore.options
table pg_dist_authinfo
table pg_dist_colocation
table pg_dist_local_group
@ -211,7 +212,6 @@ ORDER BY 1;
view citus_shards_on_worker
view citus_stat_statements
view citus_worker_stat_activity
view cstore.columnar_options
view pg_dist_shard_placement
(197 rows)

View File

@ -15,8 +15,21 @@ INSERT INTO t SELECT floor(i / 4), 2 * i FROM generate_series(11, 20) i;
SELECT * FROM t_view a ORDER BY a;
-- show columnar options for materialized view
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
-- show we can set options on a materialized view
SELECT alter_columnar_table_set('t_view', compression => 'pglz');
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
REFRESH MATERIALIZED VIEW t_view;
-- verify options have not been changed
SELECT * FROM cstore.options
WHERE regclass = 't_view'::regclass;
SELECT * FROM t_view a ORDER BY a;
-- verify that we have created metadata entries for the materialized view

View File

@ -5,42 +5,65 @@ CREATE TABLE table_options (a int) USING columnar;
INSERT INTO table_options SELECT generate_series(1,100);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- test changing the compression
SELECT alter_columnar_table_set('table_options', compression => 'pglz');
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- test changing the block_row_count
SELECT alter_columnar_table_set('table_options', block_row_count => 10);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- test changing the block_row_count
SELECT alter_columnar_table_set('table_options', stripe_row_count => 100);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- VACUUM FULL creates a new table, make sure it copies settings from the table you are vacuuming
VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- set all settings at the same time
SELECT alter_columnar_table_set('table_options', stripe_row_count => 1000, block_row_count => 100, compression => 'none');
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- make sure table options are not changed when VACUUM a table
VACUUM table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- make sure table options are not changed when VACUUM FULL a table
VACUUM FULL table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- make sure table options are not changed when truncating a table
TRUNCATE table_options;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
ALTER TABLE table_options ALTER COLUMN a TYPE bigint;
-- show table_options settings
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- reset settings one by one to the version of the GUC's
@ -50,24 +73,24 @@ SET cstore.compression TO 'pglz';
-- verify setting the GUC's didn't change the settings
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
SELECT alter_columnar_table_reset('table_options', block_row_count => true);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
SELECT alter_columnar_table_reset('table_options', stripe_row_count => true);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
SELECT alter_columnar_table_reset('table_options', compression => true);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- verify resetting all settings at once work
@ -76,7 +99,7 @@ SET cstore.stripe_row_count TO 100000;
SET cstore.compression TO 'none';
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
SELECT alter_columnar_table_reset(
@ -86,7 +109,7 @@ SELECT alter_columnar_table_reset(
compression => true);
-- show table_options settings
SELECT * FROM cstore.columnar_options
SELECT * FROM cstore.options
WHERE regclass = 'table_options'::regclass;
-- verify edge cases
@ -98,5 +121,10 @@ SELECT alter_columnar_table_reset('not_a_cstore_table', compression => true);
-- verify you can't use a compression that is not known
SELECT alter_columnar_table_set('table_options', compression => 'foobar');
-- verify options are removed when table is dropped
DROP TABLE table_options;
-- we expect no entries in çstore.options for anything not found int pg_class
SELECT * FROM cstore.options o WHERE o.regclass NOT IN (SELECT oid FROM pg_class);
SET client_min_messages TO warning;
DROP SCHEMA am_tableoptions CASCADE;