Merge pull request #1 from citusdata/use_metadata_tables

Move StripeFooter to metadata tables.
merge-cstore-pykello
Hadi Moshayedi 2020-09-07 15:33:26 -07:00 committed by GitHub
commit 4b1e80a19f
12 changed files with 283 additions and 160 deletions

5
.gitignore vendored
View File

@ -53,5 +53,10 @@
/expected/create.out /expected/create.out
/expected/data_types.out /expected/data_types.out
/expected/load.out /expected/load.out
/results/*
/.deps/*
/regression.diffs
/regression.out
.vscode
*.pb-c.* *.pb-c.*

View File

@ -5,10 +5,11 @@
MODULE_big = cstore_fdw MODULE_big = cstore_fdw
PG_CPPFLAGS = --std=c99 PG_CPPFLAGS = -std=c11
SHLIB_LINK = -lprotobuf-c SHLIB_LINK = -lprotobuf-c
OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \ OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
cstore_metadata_serialization.o cstore_compression.o mod.o cstore_metadata_serialization.o cstore_compression.o mod.o \
cstore_metadata_tables.o
EXTENSION = cstore_fdw EXTENSION = cstore_fdw
DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \ DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \

View File

@ -131,7 +131,7 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto
* Initialize state to write to the cstore file. This creates an * Initialize state to write to the cstore file. This creates an
* empty data file and a valid footer file for the table. * empty data file and a valid footer file for the table.
*/ */
writeState = CStoreBeginWrite(cstoreOptions->filename, writeState = CStoreBeginWrite(relationId, cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->compressionType, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, tupleDescriptor); cstoreOptions->blockRowCount, tupleDescriptor);
CStoreEndWrite(writeState); CStoreEndWrite(writeState);

View File

@ -89,6 +89,7 @@ typedef struct StripeMetadata
uint64 skipListLength; uint64 skipListLength;
uint64 dataLength; uint64 dataLength;
uint64 footerLength; uint64 footerLength;
uint64 id;
} StripeMetadata; } StripeMetadata;
@ -213,6 +214,8 @@ typedef struct StripeFooter
/* TableReadState represents state of a cstore file read operation. */ /* TableReadState represents state of a cstore file read operation. */
typedef struct TableReadState typedef struct TableReadState
{ {
Oid relationId;
FILE *tableFile; FILE *tableFile;
TableFooter *tableFooter; TableFooter *tableFooter;
TupleDesc tupleDescriptor; TupleDesc tupleDescriptor;
@ -238,6 +241,7 @@ typedef struct TableReadState
/* TableWriteState represents state of a cstore file write operation. */ /* TableWriteState represents state of a cstore file write operation. */
typedef struct TableWriteState typedef struct TableWriteState
{ {
Oid relationId;
FILE *tableFile; FILE *tableFile;
TableFooter *tableFooter; TableFooter *tableFooter;
StringInfo tableFooterFilename; StringInfo tableFooterFilename;
@ -248,6 +252,7 @@ typedef struct TableWriteState
Relation relation; Relation relation;
MemoryContext stripeWriteContext; MemoryContext stripeWriteContext;
uint64 currentStripeId;
StripeBuffers *stripeBuffers; StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList; StripeSkipList *stripeSkipList;
uint32 stripeMaxRowCount; uint32 stripeMaxRowCount;
@ -270,7 +275,8 @@ extern void RemoveCStoreDatabaseDirectory(Oid databaseOid);
extern void DeleteCStoreTableFiles(char *filename); extern void DeleteCStoreTableFiles(char *filename);
/* Function declarations for writing to a cstore file */ /* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(const char *filename, extern TableWriteState * CStoreBeginWrite(Oid relationId,
const char *filename,
CompressionType compressionType, CompressionType compressionType,
uint64 stripeMaxRowCount, uint64 stripeMaxRowCount,
uint32 blockRowCount, uint32 blockRowCount,
@ -280,7 +286,8 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
extern void CStoreEndWrite(TableWriteState * state); extern void CStoreEndWrite(TableWriteState * state);
/* Function declarations for reading from a cstore file */ /* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename,
TupleDesc tupleDescriptor,
List *projectedColumnList, List *qualConditions); List *projectedColumnList, List *qualConditions);
extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename); extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename);
extern bool CStoreReadFinished(TableReadState *state); extern bool CStoreReadFinished(TableReadState *state);
@ -295,10 +302,14 @@ extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *co
uint32 blockRowCount); uint32 blockRowCount);
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
uint32 columnCount); uint32 columnCount);
extern uint64 CStoreTableRowCount(const char *filename); extern uint64 CStoreTableRowCount(Oid relid, const char *filename);
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType); CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
/* cstore_metadata_tables.c */
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
#endif /* CSTORE_H */ #endif /* CSTORE_H */

View File

@ -23,17 +23,12 @@ message ColumnBlockSkipList {
repeated ColumnBlockSkipNode blockSkipNodeArray = 1; repeated ColumnBlockSkipNode blockSkipNodeArray = 1;
} }
message StripeFooter {
repeated uint64 skipListSizeArray = 1;
repeated uint64 existsSizeArray = 2;
repeated uint64 valueSizeArray = 3;
}
message StripeMetadata { message StripeMetadata {
optional uint64 fileOffset = 1; optional uint64 fileOffset = 1;
optional uint64 skipListLength = 2; optional uint64 skipListLength = 2;
optional uint64 dataLength = 3; optional uint64 dataLength = 3;
optional uint64 footerLength = 4; optional uint64 footerLength = 4;
optional uint64 id = 5;
} }
message TableFooter { message TableFooter {

View File

@ -58,3 +58,17 @@ CREATE EVENT TRIGGER cstore_drop_event
ON SQL_DROP ON SQL_DROP
EXECUTE PROCEDURE cstore_drop_trigger(); EXECUTE PROCEDURE cstore_drop_trigger();
CREATE TABLE cstore_stripe_attr (
relid oid,
stripe bigint,
attr int,
exists_size bigint,
value_size bigint,
skiplist_size bigint
) WITH (user_catalog_table = true);
CREATE INDEX cstore_stripe_attr_idx
ON cstore_stripe_attr
USING BTREE(relid, stripe, attr);
ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog;

View File

@ -152,7 +152,7 @@ static ForeignScan * CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel
Oid foreignTableId, ForeignPath *bestPath, Oid foreignTableId, ForeignPath *bestPath,
List *targetList, List *scanClauses); List *targetList, List *scanClauses);
#endif #endif
static double TupleCountEstimate(RelOptInfo *baserel, const char *filename); static double TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename);
static BlockNumber PageCount(const char *filename); static BlockNumber PageCount(const char *filename);
static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId); static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId);
static void CStoreExplainForeignScan(ForeignScanState *scanState, static void CStoreExplainForeignScan(ForeignScanState *scanState,
@ -602,7 +602,8 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
#endif #endif
/* init state to write to the cstore file */ /* init state to write to the cstore file */
writeState = CStoreBeginWrite(cstoreOptions->filename, writeState = CStoreBeginWrite(relationId,
cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->compressionType,
cstoreOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, cstoreOptions->blockRowCount,
@ -1414,6 +1415,7 @@ ValidateForeignTableOptions(char *filename, char *compressionTypeString,
static char * static char *
CStoreDefaultFilePath(Oid foreignTableId) CStoreDefaultFilePath(Oid foreignTableId)
{ {
StringInfo cstoreFilePath = NULL;
Relation relation = relation_open(foreignTableId, AccessShareLock); Relation relation = relation_open(foreignTableId, AccessShareLock);
RelFileNode relationFileNode = relation->rd_node; RelFileNode relationFileNode = relation->rd_node;
Oid databaseOid = relationFileNode.dbNode; Oid databaseOid = relationFileNode.dbNode;
@ -1429,7 +1431,7 @@ CStoreDefaultFilePath(Oid foreignTableId)
} }
StringInfo cstoreFilePath = makeStringInfo(); cstoreFilePath = makeStringInfo();
appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME, appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME,
databaseOid, relationFileOid); databaseOid, relationFileOid);
@ -1445,7 +1447,7 @@ static void
CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
{ {
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, cstoreOptions->filename);
double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo, double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo,
0, JOIN_INNER, NULL); 0, JOIN_INNER, NULL);
@ -1492,7 +1494,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
double queryPageCount = relationPageCount * queryColumnRatio; double queryPageCount = relationPageCount * queryColumnRatio;
double totalDiskAccessCost = seq_page_cost * queryPageCount; double totalDiskAccessCost = seq_page_cost * queryPageCount;
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, cstoreOptions->filename);
/* /*
* We estimate costs almost the same way as cost_seqscan(), thus assuming * We estimate costs almost the same way as cost_seqscan(), thus assuming
@ -1597,7 +1599,7 @@ CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId,
* file. * file.
*/ */
static double static double
TupleCountEstimate(RelOptInfo *baserel, const char *filename) TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
{ {
double tupleCountEstimate = 0.0; double tupleCountEstimate = 0.0;
@ -1616,7 +1618,7 @@ TupleCountEstimate(RelOptInfo *baserel, const char *filename)
} }
else else
{ {
tupleCountEstimate = (double) CStoreTableRowCount(filename); tupleCountEstimate = (double) CStoreTableRowCount(relid, filename);
} }
return tupleCountEstimate; return tupleCountEstimate;
@ -1809,8 +1811,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
whereClauseList = foreignScan->scan.plan.qual; whereClauseList = foreignScan->scan.plan.qual;
columnList = (List *) linitial(foreignPrivateList); columnList = (List *) linitial(foreignPrivateList);
readState = CStoreBeginRead(cstoreOptions->filename, tupleDescriptor, readState = CStoreBeginRead(foreignTableId, cstoreOptions->filename,
columnList, whereClauseList); tupleDescriptor, columnList, whereClauseList);
scanState->fdw_state = (void *) readState; scanState->fdw_state = (void *) readState;
} }
@ -2161,7 +2163,8 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
cstoreOptions = CStoreGetOptions(foreignTableOid); cstoreOptions = CStoreGetOptions(foreignTableOid);
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
writeState = CStoreBeginWrite(cstoreOptions->filename, writeState = CStoreBeginWrite(foreignTableOid,
cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->compressionType,
cstoreOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, cstoreOptions->blockRowCount,

View File

@ -95,6 +95,8 @@ SerializeTableFooter(TableFooter *tableFooter)
protobufStripeMetadata->datalength = stripeMetadata->dataLength; protobufStripeMetadata->datalength = stripeMetadata->dataLength;
protobufStripeMetadata->has_footerlength = true; protobufStripeMetadata->has_footerlength = true;
protobufStripeMetadata->footerlength = stripeMetadata->footerLength; protobufStripeMetadata->footerlength = stripeMetadata->footerLength;
protobufStripeMetadata->has_id = true;
protobufStripeMetadata->id = stripeMetadata->id;
stripeMetadataArray[stripeIndex] = protobufStripeMetadata; stripeMetadataArray[stripeIndex] = protobufStripeMetadata;
stripeIndex++; stripeIndex++;
@ -118,38 +120,6 @@ SerializeTableFooter(TableFooter *tableFooter)
} }
/*
* SerializeStripeFooter serializes given stripe footer and returns the result
* as a StringInfo.
*/
StringInfo
SerializeStripeFooter(StripeFooter *stripeFooter)
{
StringInfo stripeFooterBuffer = NULL;
Protobuf__StripeFooter protobufStripeFooter = PROTOBUF__STRIPE_FOOTER__INIT;
uint8 *stripeFooterData = NULL;
uint32 stripeFooterSize = 0;
protobufStripeFooter.n_skiplistsizearray = stripeFooter->columnCount;
protobufStripeFooter.skiplistsizearray = (uint64_t *) stripeFooter->skipListSizeArray;
protobufStripeFooter.n_existssizearray = stripeFooter->columnCount;
protobufStripeFooter.existssizearray = (uint64_t *) stripeFooter->existsSizeArray;
protobufStripeFooter.n_valuesizearray = stripeFooter->columnCount;
protobufStripeFooter.valuesizearray = (uint64_t *) stripeFooter->valueSizeArray;
stripeFooterSize = protobuf__stripe_footer__get_packed_size(&protobufStripeFooter);
stripeFooterData = palloc0(stripeFooterSize);
protobuf__stripe_footer__pack(&protobufStripeFooter, stripeFooterData);
stripeFooterBuffer = palloc0(sizeof(StringInfoData));
stripeFooterBuffer->len = stripeFooterSize;
stripeFooterBuffer->maxlen = stripeFooterSize;
stripeFooterBuffer->data = (char *) stripeFooterData;
return stripeFooterBuffer;
}
/* /*
* SerializeColumnSkipList serializes a column skip list, where the colum skip * SerializeColumnSkipList serializes a column skip list, where the colum skip
* list includes all block skip nodes for that column. The function then returns * list includes all block skip nodes for that column. The function then returns
@ -315,6 +285,7 @@ DeserializeTableFooter(StringInfo buffer)
stripeMetadata->skipListLength = protobufStripeMetadata->skiplistlength; stripeMetadata->skipListLength = protobufStripeMetadata->skiplistlength;
stripeMetadata->dataLength = protobufStripeMetadata->datalength; stripeMetadata->dataLength = protobufStripeMetadata->datalength;
stripeMetadata->footerLength = protobufStripeMetadata->footerlength; stripeMetadata->footerLength = protobufStripeMetadata->footerlength;
stripeMetadata->id = protobufStripeMetadata->id;
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata); stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
} }
@ -329,59 +300,6 @@ DeserializeTableFooter(StringInfo buffer)
} }
/*
* DeserializeStripeFooter deserializes the given buffer and returns the result
* as a StripeFooter struct.
*/
StripeFooter *
DeserializeStripeFooter(StringInfo buffer)
{
StripeFooter *stripeFooter = NULL;
Protobuf__StripeFooter *protobufStripeFooter = NULL;
uint64 *skipListSizeArray = NULL;
uint64 *existsSizeArray = NULL;
uint64 *valueSizeArray = NULL;
uint64 sizeArrayLength = 0;
uint32 columnCount = 0;
protobufStripeFooter = protobuf__stripe_footer__unpack(NULL, buffer->len,
(uint8 *) buffer->data);
if (protobufStripeFooter == NULL)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("invalid stripe footer buffer")));
}
columnCount = protobufStripeFooter->n_skiplistsizearray;
if (protobufStripeFooter->n_existssizearray != columnCount ||
protobufStripeFooter->n_valuesizearray != columnCount)
{
ereport(ERROR, (errmsg("could not unpack column store"),
errdetail("stripe size array lengths don't match")));
}
sizeArrayLength = columnCount * sizeof(uint64);
skipListSizeArray = palloc0(sizeArrayLength);
existsSizeArray = palloc0(sizeArrayLength);
valueSizeArray = palloc0(sizeArrayLength);
memcpy(skipListSizeArray, protobufStripeFooter->skiplistsizearray, sizeArrayLength);
memcpy(existsSizeArray, protobufStripeFooter->existssizearray, sizeArrayLength);
memcpy(valueSizeArray, protobufStripeFooter->valuesizearray, sizeArrayLength);
protobuf__stripe_footer__free_unpacked(protobufStripeFooter, NULL);
stripeFooter = palloc0(sizeof(StripeFooter));
stripeFooter->skipListSizeArray = skipListSizeArray;
stripeFooter->existsSizeArray = existsSizeArray;
stripeFooter->valueSizeArray = valueSizeArray;
stripeFooter->columnCount = columnCount;
return stripeFooter;
}
/* /*
* DeserializeBlockCount deserializes the given column skip list buffer and * DeserializeBlockCount deserializes the given column skip list buffer and
* returns the number of blocks in column skip list. * returns the number of blocks in column skip list.

View File

@ -17,7 +17,6 @@
/* Function declarations for metadata serialization */ /* Function declarations for metadata serialization */
extern StringInfo SerializePostScript(uint64 tableFooterLength); extern StringInfo SerializePostScript(uint64 tableFooterLength);
extern StringInfo SerializeTableFooter(TableFooter *tableFooter); extern StringInfo SerializeTableFooter(TableFooter *tableFooter);
extern StringInfo SerializeStripeFooter(StripeFooter *stripeFooter);
extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, bool typeByValue, uint32 blockCount, bool typeByValue,
int typeLength); int typeLength);
@ -27,7 +26,6 @@ extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength);
extern TableFooter * DeserializeTableFooter(StringInfo buffer); extern TableFooter * DeserializeTableFooter(StringInfo buffer);
extern uint32 DeserializeBlockCount(StringInfo buffer); extern uint32 DeserializeBlockCount(StringInfo buffer);
extern uint32 DeserializeRowCount(StringInfo buffer); extern uint32 DeserializeRowCount(StringInfo buffer);
extern StripeFooter * DeserializeStripeFooter(StringInfo buffer);
extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer, extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer,
bool typeByValue, int typeLength, bool typeByValue, int typeLength,
uint32 blockCount); uint32 blockCount);

186
cstore_metadata_tables.c Normal file
View File

@ -0,0 +1,186 @@
/*-------------------------------------------------------------------------
*
* cstore_metadata_tables.c
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cstore.h"
#include "cstore_version_compat.h"
#include <sys/stat.h>
#include "access/nbtree.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_collation.h"
#include "commands/defrem.h"
#include "lib/stringinfo.h"
#include "optimizer/optimizer.h"
#include "port.h"
#include "storage/fd.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "cstore_metadata_serialization.h"
static Oid CStoreStripeAttrRelationId(void);
static Oid CStoreStripeAttrIndexRelationId(void);
static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
uint64 existsSize, uint64 valuesSize,
uint64 skiplistSize);
/* constants for cstore_stripe_attr */
#define Natts_cstore_stripe_attr 6
#define Anum_cstore_stripe_attr_relid 1
#define Anum_cstore_stripe_attr_stripe 2
#define Anum_cstore_stripe_attr_attr 3
#define Anum_cstore_stripe_attr_exists_size 4
#define Anum_cstore_stripe_attr_value_size 5
#define Anum_cstore_stripe_attr_skiplist_size 6
/*
* SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records.
*/
void
SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
{
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
{
InsertStripeAttrRow(relid, stripe, attr,
footer->existsSizeArray[attr - 1],
footer->valueSizeArray[attr - 1],
footer->skipListSizeArray[attr - 1]);
}
}
/*
* InsertStripeAttrRow adds a row to cstore_stripe_attr.
*/
static void
InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
uint64 existsSize, uint64 valuesSize,
uint64 skiplistSize)
{
bool nulls[Natts_cstore_stripe_attr] = { 0 };
Datum values[Natts_cstore_stripe_attr] = {
ObjectIdGetDatum(relid),
Int64GetDatum(stripe),
Int16GetDatum(attr),
Int64GetDatum(existsSize),
Int64GetDatum(valuesSize),
Int64GetDatum(skiplistSize)
};
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs);
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
CatalogTupleInsert(cstoreStripeAttrs, tuple);
CommandCounterIncrement();
heap_close(cstoreStripeAttrs, NoLock);
}
/*
* ReadStripeFooter returns a StripeFooter by reading relevant records from
* cstore_stripe_attr.
*/
StripeFooter *
ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
{
StripeFooter *footer = NULL;
HeapTuple heapTuple;
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, AccessShareLock);
Relation index = index_open(CStoreStripeAttrIndexRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs);
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_cstore_stripe_attr_relid,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
ScanKeyInit(&scanKey[1], Anum_cstore_stripe_attr_stripe,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(stripe));
scanDescriptor = systable_beginscan_ordered(cstoreStripeAttrs, index, NULL, 2,
scanKey);
footer = palloc0(sizeof(StripeFooter));
footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64));
footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64));
footer->skipListSizeArray = palloc0(relationColumnCount * sizeof(int64));
/*
* Stripe can have less columns than the relation if ALTER TABLE happens
* after stripe is formed. So we calculate column count of a stripe as
* maximum attribute number for that stripe.
*/
footer->columnCount = 0;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Datum datumArray[Natts_cstore_stripe_attr];
bool isNullArray[Natts_cstore_stripe_attr];
AttrNumber attr = 0;
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
attr = DatumGetInt16(datumArray[2]);
footer->columnCount = Max(footer->columnCount, attr);
while (attr > relationColumnCount)
{
ereport(ERROR, (errmsg("unexpected attribute %d for a relation with %d attrs",
attr, relationColumnCount)));
}
footer->existsSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]);
footer->valueSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]);
footer->skipListSizeArray[attr - 1] =
DatumGetInt64(datumArray[Anum_cstore_stripe_attr_skiplist_size - 1]);
}
systable_endscan_ordered(scanDescriptor);
index_close(index, NoLock);
heap_close(cstoreStripeAttrs, NoLock);
return footer;
}
/*
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreStripeAttrRelationId(void)
{
return get_relname_relid("cstore_stripe_attr", PG_CATALOG_NAMESPACE);
}
/*
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_idx.
* TODO: should we cache this similar to citus?
*/
static Oid
CStoreStripeAttrIndexRelationId(void)
{
return get_relname_relid("cstore_stripe_attr_idx", PG_CATALOG_NAMESPACE);
}

View File

@ -31,6 +31,7 @@
#include "storage/fd.h" #include "storage/fd.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h"
#include "cstore.h" #include "cstore.h"
#include "cstore_metadata_serialization.h" #include "cstore_metadata_serialization.h"
@ -39,6 +40,7 @@
/* static function declarations */ /* static function declarations */
static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile, static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile,
StripeMetadata *stripeMetadata, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
List *projectedColumnList, List *projectedColumnList,
List *whereClauseList); List *whereClauseList);
@ -51,8 +53,6 @@ static ColumnBuffers * LoadColumnBuffers(FILE *tableFile,
uint32 blockCount, uint64 existsFileOffset, uint32 blockCount, uint64 existsFileOffset,
uint64 valueFileOffset, uint64 valueFileOffset,
Form_pg_attribute attributeForm); Form_pg_attribute attributeForm);
static StripeFooter * LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 columnCount);
static StripeSkipList * LoadStripeSkipList(FILE *tableFile, static StripeSkipList * LoadStripeSkipList(FILE *tableFile,
StripeMetadata *stripeMetadata, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, StripeFooter *stripeFooter,
@ -86,7 +86,8 @@ static int64 FILESize(FILE *file);
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size); static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size);
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
uint32 columnCount); uint32 columnCount);
static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata); static uint64 StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata);
static int RelationColumnCount(Oid relid);
/* /*
@ -94,7 +95,7 @@ static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata);
* read handle that's used during reading rows and finishing the read operation. * read handle that's used during reading rows and finishing the read operation.
*/ */
TableReadState * TableReadState *
CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList) List *projectedColumnList, List *whereClauseList)
{ {
TableReadState *readState = NULL; TableReadState *readState = NULL;
@ -136,6 +137,7 @@ CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor,
tableFooter->blockRowCount); tableFooter->blockRowCount);
readState = palloc0(sizeof(TableReadState)); readState = palloc0(sizeof(TableReadState));
readState->relationId = relationId;
readState->tableFile = tableFile; readState->tableFile = tableFile;
readState->tableFooter = tableFooter; readState->tableFooter = tableFooter;
readState->projectedColumnList = projectedColumnList; readState->projectedColumnList = projectedColumnList;
@ -247,6 +249,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
StripeMetadata *stripeMetadata = NULL; StripeMetadata *stripeMetadata = NULL;
List *stripeMetadataList = tableFooter->stripeMetadataList; List *stripeMetadataList = tableFooter->stripeMetadataList;
uint32 stripeCount = list_length(stripeMetadataList); uint32 stripeCount = list_length(stripeMetadataList);
StripeFooter *stripeFooter = NULL;
/* if we have read all stripes, return false */ /* if we have read all stripes, return false */
if (readState->readStripeCount == stripeCount) if (readState->readStripeCount == stripeCount)
@ -258,7 +261,11 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
MemoryContextReset(readState->stripeReadContext); MemoryContextReset(readState->stripeReadContext);
stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount);
stripeFooter = ReadStripeFooter(readState->relationId,
stripeMetadata->id,
readState->tupleDescriptor->natts);
stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata, stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata,
stripeFooter,
readState->tupleDescriptor, readState->tupleDescriptor,
readState->projectedColumnList, readState->projectedColumnList,
readState->whereClauseList); readState->whereClauseList);
@ -396,7 +403,7 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount)
/* CStoreTableRowCount returns the exact row count of a table using skiplists */ /* CStoreTableRowCount returns the exact row count of a table using skiplists */
uint64 uint64
CStoreTableRowCount(const char *filename) CStoreTableRowCount(Oid relid, const char *filename)
{ {
TableFooter *tableFooter = NULL; TableFooter *tableFooter = NULL;
FILE *tableFile; FILE *tableFile;
@ -422,7 +429,7 @@ CStoreTableRowCount(const char *filename)
foreach(stripeMetadataCell, tableFooter->stripeMetadataList) foreach(stripeMetadataCell, tableFooter->stripeMetadataList)
{ {
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
totalRowCount += StripeRowCount(tableFile, stripeMetadata); totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata);
} }
FreeFile(tableFile); FreeFile(tableFile);
@ -436,20 +443,13 @@ CStoreTableRowCount(const char *filename)
* skip list, and returns number of rows for given stripe. * skip list, and returns number of rows for given stripe.
*/ */
static uint64 static uint64
StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata) StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata)
{ {
uint64 rowCount = 0; uint64 rowCount = 0;
StripeFooter *stripeFooter = NULL;
StringInfo footerBuffer = NULL;
StringInfo firstColumnSkipListBuffer = NULL; StringInfo firstColumnSkipListBuffer = NULL;
uint64 footerOffset = 0;
footerOffset += stripeMetadata->fileOffset; StripeFooter * stripeFooter = ReadStripeFooter(relid, stripeMetadata->id,
footerOffset += stripeMetadata->skipListLength; RelationColumnCount(relid));
footerOffset += stripeMetadata->dataLength;
footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength);
stripeFooter = DeserializeStripeFooter(footerBuffer);
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]); stripeFooter->skipListSizeArray[0]);
@ -466,8 +466,8 @@ StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata)
*/ */
static StripeBuffers * static StripeBuffers *
LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
TupleDesc tupleDescriptor, List *projectedColumnList, StripeFooter *stripeFooter, TupleDesc tupleDescriptor,
List *whereClauseList) List *projectedColumnList, List *whereClauseList)
{ {
StripeBuffers *stripeBuffers = NULL; StripeBuffers *stripeBuffers = NULL;
ColumnBuffers **columnBuffersArray = NULL; ColumnBuffers **columnBuffersArray = NULL;
@ -475,8 +475,6 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 columnIndex = 0; uint32 columnIndex = 0;
uint32 columnCount = tupleDescriptor->natts; uint32 columnCount = tupleDescriptor->natts;
StripeFooter *stripeFooter = LoadStripeFooter(tableFile, stripeMetadata,
columnCount);
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata, StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata,
@ -617,31 +615,6 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
} }
/* Reads and returns the given stripe's footer. */
static StripeFooter *
LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 columnCount)
{
StripeFooter *stripeFooter = NULL;
StringInfo footerBuffer = NULL;
uint64 footerOffset = 0;
footerOffset += stripeMetadata->fileOffset;
footerOffset += stripeMetadata->skipListLength;
footerOffset += stripeMetadata->dataLength;
footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength);
stripeFooter = DeserializeStripeFooter(footerBuffer);
if (stripeFooter->columnCount > columnCount)
{
ereport(ERROR, (errmsg("stripe footer column count and table column count "
"don't match")));
}
return stripeFooter;
}
/* Reads the skip list for the given stripe. */ /* Reads the skip list for the given stripe. */
static StripeSkipList * static StripeSkipList *
LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
@ -1377,3 +1350,15 @@ ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount)
} }
} }
} }
static int
RelationColumnCount(Oid relid)
{
Relation rel = RelationIdGetRelation(relid);
TupleDesc tupleDesc = RelationGetDescr(rel);
int columnCount = tupleDesc->natts;
RelationClose(rel);
return columnCount;
}

View File

@ -65,7 +65,8 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
* will be added. * will be added.
*/ */
TableWriteState * TableWriteState *
CStoreBeginWrite(const char *filename, CompressionType compressionType, CStoreBeginWrite(Oid relationId,
const char *filename, CompressionType compressionType,
uint64 stripeMaxRowCount, uint32 blockRowCount, uint64 stripeMaxRowCount, uint32 blockRowCount,
TupleDesc tupleDescriptor) TupleDesc tupleDescriptor)
{ {
@ -82,6 +83,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType,
int statResult = 0; int statResult = 0;
bool *columnMaskArray = NULL; bool *columnMaskArray = NULL;
ColumnBlockData **blockData = NULL; ColumnBlockData **blockData = NULL;
uint64 currentStripeId = 0;
tableFooterFilename = makeStringInfo(); tableFooterFilename = makeStringInfo();
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
@ -130,6 +132,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType,
lastStripeSize += lastStripe->footerLength; lastStripeSize += lastStripe->footerLength;
currentFileOffset = lastStripe->fileOffset + lastStripeSize; currentFileOffset = lastStripe->fileOffset + lastStripeSize;
currentStripeId = lastStripe->id + 1;
errno = 0; errno = 0;
fseekResult = fseeko(tableFile, currentFileOffset, SEEK_SET); fseekResult = fseeko(tableFile, currentFileOffset, SEEK_SET);
@ -173,6 +176,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType,
blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount); blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount);
writeState = palloc0(sizeof(TableWriteState)); writeState = palloc0(sizeof(TableWriteState));
writeState->relationId = relationId;
writeState->tableFile = tableFile; writeState->tableFile = tableFile;
writeState->tableFooterFilename = tableFooterFilename; writeState->tableFooterFilename = tableFooterFilename;
writeState->tableFooter = tableFooter; writeState->tableFooter = tableFooter;
@ -186,6 +190,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType,
writeState->stripeWriteContext = stripeWriteContext; writeState->stripeWriteContext = stripeWriteContext;
writeState->blockDataArray = blockData; writeState->blockDataArray = blockData;
writeState->compressionBuffer = NULL; writeState->compressionBuffer = NULL;
writeState->currentStripeId = currentStripeId;
return writeState; return writeState;
} }
@ -286,6 +291,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
StripeMetadata stripeMetadata = FlushStripe(writeState); StripeMetadata stripeMetadata = FlushStripe(writeState);
MemoryContextReset(writeState->stripeWriteContext); MemoryContextReset(writeState->stripeWriteContext);
writeState->currentStripeId++;
/* set stripe data and skip list to NULL so they are recreated next time */ /* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL; writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL; writeState->stripeSkipList = NULL;
@ -490,7 +497,6 @@ FlushStripe(TableWriteState *writeState)
uint64 dataLength = 0; uint64 dataLength = 0;
StringInfo *skipListBufferArray = NULL; StringInfo *skipListBufferArray = NULL;
StripeFooter *stripeFooter = NULL; StripeFooter *stripeFooter = NULL;
StringInfo stripeFooterBuffer = NULL;
uint32 columnIndex = 0; uint32 columnIndex = 0;
uint32 blockIndex = 0; uint32 blockIndex = 0;
TableFooter *tableFooter = writeState->tableFooter; TableFooter *tableFooter = writeState->tableFooter;
@ -545,7 +551,6 @@ FlushStripe(TableWriteState *writeState)
/* create skip list and footer buffers */ /* create skip list and footer buffers */
skipListBufferArray = CreateSkipListBufferArray(stripeSkipList, tupleDescriptor); skipListBufferArray = CreateSkipListBufferArray(stripeSkipList, tupleDescriptor);
stripeFooter = CreateStripeFooter(stripeSkipList, skipListBufferArray); stripeFooter = CreateStripeFooter(stripeSkipList, skipListBufferArray);
stripeFooterBuffer = SerializeStripeFooter(stripeFooter);
/* /*
* Each stripe has three sections: * Each stripe has three sections:
@ -594,7 +599,9 @@ FlushStripe(TableWriteState *writeState)
} }
/* finally, we flush the footer buffer */ /* finally, we flush the footer buffer */
WriteToFile(tableFile, stripeFooterBuffer->data, stripeFooterBuffer->len); SaveStripeFooter(writeState->relationId,
writeState->currentStripeId,
stripeFooter);
/* set stripe metadata */ /* set stripe metadata */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
@ -607,12 +614,12 @@ FlushStripe(TableWriteState *writeState)
stripeMetadata.fileOffset = writeState->currentFileOffset; stripeMetadata.fileOffset = writeState->currentFileOffset;
stripeMetadata.skipListLength = skipListLength; stripeMetadata.skipListLength = skipListLength;
stripeMetadata.dataLength = dataLength; stripeMetadata.dataLength = dataLength;
stripeMetadata.footerLength = stripeFooterBuffer->len; stripeMetadata.footerLength = 0;
stripeMetadata.id = writeState->currentStripeId;
/* advance current file offset */ /* advance current file offset */
writeState->currentFileOffset += skipListLength; writeState->currentFileOffset += skipListLength;
writeState->currentFileOffset += dataLength; writeState->currentFileOffset += dataLength;
writeState->currentFileOffset += stripeFooterBuffer->len;
return stripeMetadata; return stripeMetadata;
} }