diff --git a/.gitignore b/.gitignore index f95fd0b87..21c5e32ea 100644 --- a/.gitignore +++ b/.gitignore @@ -53,5 +53,10 @@ /expected/create.out /expected/data_types.out /expected/load.out +/results/* +/.deps/* +/regression.diffs +/regression.out +.vscode *.pb-c.* diff --git a/Makefile b/Makefile index bd3ae77ce..8f1bf08cc 100644 --- a/Makefile +++ b/Makefile @@ -5,10 +5,11 @@ MODULE_big = cstore_fdw -PG_CPPFLAGS = --std=c99 +PG_CPPFLAGS = -std=c11 SHLIB_LINK = -lprotobuf-c OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \ - cstore_metadata_serialization.o cstore_compression.o mod.o + cstore_metadata_serialization.o cstore_compression.o mod.o \ + cstore_metadata_tables.o EXTENSION = cstore_fdw DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \ diff --git a/cstore.c b/cstore.c index e704bc31d..a98f983e3 100644 --- a/cstore.c +++ b/cstore.c @@ -131,7 +131,7 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto * Initialize state to write to the cstore file. This creates an * empty data file and a valid footer file for the table. */ - writeState = CStoreBeginWrite(cstoreOptions->filename, + writeState = CStoreBeginWrite(relationId, cstoreOptions->filename, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, tupleDescriptor); CStoreEndWrite(writeState); diff --git a/cstore.h b/cstore.h index a694e1e29..500a38cdb 100644 --- a/cstore.h +++ b/cstore.h @@ -89,6 +89,7 @@ typedef struct StripeMetadata uint64 skipListLength; uint64 dataLength; uint64 footerLength; + uint64 id; } StripeMetadata; @@ -213,6 +214,8 @@ typedef struct StripeFooter /* TableReadState represents state of a cstore file read operation. */ typedef struct TableReadState { + Oid relationId; + FILE *tableFile; TableFooter *tableFooter; TupleDesc tupleDescriptor; @@ -238,6 +241,7 @@ typedef struct TableReadState /* TableWriteState represents state of a cstore file write operation. */ typedef struct TableWriteState { + Oid relationId; FILE *tableFile; TableFooter *tableFooter; StringInfo tableFooterFilename; @@ -248,6 +252,7 @@ typedef struct TableWriteState Relation relation; MemoryContext stripeWriteContext; + uint64 currentStripeId; StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; uint32 stripeMaxRowCount; @@ -270,7 +275,8 @@ extern void RemoveCStoreDatabaseDirectory(Oid databaseOid); extern void DeleteCStoreTableFiles(char *filename); /* Function declarations for writing to a cstore file */ -extern TableWriteState * CStoreBeginWrite(const char *filename, +extern TableWriteState * CStoreBeginWrite(Oid relationId, + const char *filename, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, @@ -280,7 +286,8 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues, extern void CStoreEndWrite(TableWriteState * state); /* Function declarations for reading from a cstore file */ -extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, +extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename, + TupleDesc tupleDescriptor, List *projectedColumnList, List *qualConditions); extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename); extern bool CStoreReadFinished(TableReadState *state); @@ -295,10 +302,14 @@ extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *co uint32 blockRowCount); extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount); -extern uint64 CStoreTableRowCount(const char *filename); +extern uint64 CStoreTableRowCount(Oid relid, const char *filename); extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); +/* cstore_metadata_tables.c */ +extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer); +extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount); + #endif /* CSTORE_H */ diff --git a/cstore.proto b/cstore.proto index 6e24c9075..ea949c77c 100644 --- a/cstore.proto +++ b/cstore.proto @@ -23,17 +23,12 @@ message ColumnBlockSkipList { repeated ColumnBlockSkipNode blockSkipNodeArray = 1; } -message StripeFooter { - repeated uint64 skipListSizeArray = 1; - repeated uint64 existsSizeArray = 2; - repeated uint64 valueSizeArray = 3; -} - message StripeMetadata { optional uint64 fileOffset = 1; optional uint64 skipListLength = 2; optional uint64 dataLength = 3; optional uint64 footerLength = 4; + optional uint64 id = 5; } message TableFooter { diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index ad2683f52..fd526e711 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -58,3 +58,17 @@ CREATE EVENT TRIGGER cstore_drop_event ON SQL_DROP EXECUTE PROCEDURE cstore_drop_trigger(); +CREATE TABLE cstore_stripe_attr ( + relid oid, + stripe bigint, + attr int, + exists_size bigint, + value_size bigint, + skiplist_size bigint +) WITH (user_catalog_table = true); + +CREATE INDEX cstore_stripe_attr_idx + ON cstore_stripe_attr + USING BTREE(relid, stripe, attr); + +ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog; diff --git a/cstore_fdw.c b/cstore_fdw.c index 9787fd2a2..6bcb92269 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -152,7 +152,7 @@ static ForeignScan * CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel Oid foreignTableId, ForeignPath *bestPath, List *targetList, List *scanClauses); #endif -static double TupleCountEstimate(RelOptInfo *baserel, const char *filename); +static double TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename); static BlockNumber PageCount(const char *filename); static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId); static void CStoreExplainForeignScan(ForeignScanState *scanState, @@ -602,7 +602,8 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) #endif /* init state to write to the cstore file */ - writeState = CStoreBeginWrite(cstoreOptions->filename, + writeState = CStoreBeginWrite(relationId, + cstoreOptions->filename, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, @@ -1414,6 +1415,7 @@ ValidateForeignTableOptions(char *filename, char *compressionTypeString, static char * CStoreDefaultFilePath(Oid foreignTableId) { + StringInfo cstoreFilePath = NULL; Relation relation = relation_open(foreignTableId, AccessShareLock); RelFileNode relationFileNode = relation->rd_node; Oid databaseOid = relationFileNode.dbNode; @@ -1429,7 +1431,7 @@ CStoreDefaultFilePath(Oid foreignTableId) } - StringInfo cstoreFilePath = makeStringInfo(); + cstoreFilePath = makeStringInfo(); appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME, databaseOid, relationFileOid); @@ -1445,7 +1447,7 @@ static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) { CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); - double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); + double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, cstoreOptions->filename); double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo, 0, JOIN_INNER, NULL); @@ -1492,7 +1494,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId double queryPageCount = relationPageCount * queryColumnRatio; double totalDiskAccessCost = seq_page_cost * queryPageCount; - double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); + double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, cstoreOptions->filename); /* * We estimate costs almost the same way as cost_seqscan(), thus assuming @@ -1597,7 +1599,7 @@ CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, * file. */ static double -TupleCountEstimate(RelOptInfo *baserel, const char *filename) +TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename) { double tupleCountEstimate = 0.0; @@ -1616,7 +1618,7 @@ TupleCountEstimate(RelOptInfo *baserel, const char *filename) } else { - tupleCountEstimate = (double) CStoreTableRowCount(filename); + tupleCountEstimate = (double) CStoreTableRowCount(relid, filename); } return tupleCountEstimate; @@ -1809,8 +1811,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) whereClauseList = foreignScan->scan.plan.qual; columnList = (List *) linitial(foreignPrivateList); - readState = CStoreBeginRead(cstoreOptions->filename, tupleDescriptor, - columnList, whereClauseList); + readState = CStoreBeginRead(foreignTableId, cstoreOptions->filename, + tupleDescriptor, columnList, whereClauseList); scanState->fdw_state = (void *) readState; } @@ -2161,7 +2163,8 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela cstoreOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); - writeState = CStoreBeginWrite(cstoreOptions->filename, + writeState = CStoreBeginWrite(foreignTableOid, + cstoreOptions->filename, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, diff --git a/cstore_metadata_serialization.c b/cstore_metadata_serialization.c index 67ae2ec2c..94e3c3116 100644 --- a/cstore_metadata_serialization.c +++ b/cstore_metadata_serialization.c @@ -95,6 +95,8 @@ SerializeTableFooter(TableFooter *tableFooter) protobufStripeMetadata->datalength = stripeMetadata->dataLength; protobufStripeMetadata->has_footerlength = true; protobufStripeMetadata->footerlength = stripeMetadata->footerLength; + protobufStripeMetadata->has_id = true; + protobufStripeMetadata->id = stripeMetadata->id; stripeMetadataArray[stripeIndex] = protobufStripeMetadata; stripeIndex++; @@ -118,38 +120,6 @@ SerializeTableFooter(TableFooter *tableFooter) } -/* - * SerializeStripeFooter serializes given stripe footer and returns the result - * as a StringInfo. - */ -StringInfo -SerializeStripeFooter(StripeFooter *stripeFooter) -{ - StringInfo stripeFooterBuffer = NULL; - Protobuf__StripeFooter protobufStripeFooter = PROTOBUF__STRIPE_FOOTER__INIT; - uint8 *stripeFooterData = NULL; - uint32 stripeFooterSize = 0; - - protobufStripeFooter.n_skiplistsizearray = stripeFooter->columnCount; - protobufStripeFooter.skiplistsizearray = (uint64_t *) stripeFooter->skipListSizeArray; - protobufStripeFooter.n_existssizearray = stripeFooter->columnCount; - protobufStripeFooter.existssizearray = (uint64_t *) stripeFooter->existsSizeArray; - protobufStripeFooter.n_valuesizearray = stripeFooter->columnCount; - protobufStripeFooter.valuesizearray = (uint64_t *) stripeFooter->valueSizeArray; - - stripeFooterSize = protobuf__stripe_footer__get_packed_size(&protobufStripeFooter); - stripeFooterData = palloc0(stripeFooterSize); - protobuf__stripe_footer__pack(&protobufStripeFooter, stripeFooterData); - - stripeFooterBuffer = palloc0(sizeof(StringInfoData)); - stripeFooterBuffer->len = stripeFooterSize; - stripeFooterBuffer->maxlen = stripeFooterSize; - stripeFooterBuffer->data = (char *) stripeFooterData; - - return stripeFooterBuffer; -} - - /* * SerializeColumnSkipList serializes a column skip list, where the colum skip * list includes all block skip nodes for that column. The function then returns @@ -315,6 +285,7 @@ DeserializeTableFooter(StringInfo buffer) stripeMetadata->skipListLength = protobufStripeMetadata->skiplistlength; stripeMetadata->dataLength = protobufStripeMetadata->datalength; stripeMetadata->footerLength = protobufStripeMetadata->footerlength; + stripeMetadata->id = protobufStripeMetadata->id; stripeMetadataList = lappend(stripeMetadataList, stripeMetadata); } @@ -329,59 +300,6 @@ DeserializeTableFooter(StringInfo buffer) } -/* - * DeserializeStripeFooter deserializes the given buffer and returns the result - * as a StripeFooter struct. - */ -StripeFooter * -DeserializeStripeFooter(StringInfo buffer) -{ - StripeFooter *stripeFooter = NULL; - Protobuf__StripeFooter *protobufStripeFooter = NULL; - uint64 *skipListSizeArray = NULL; - uint64 *existsSizeArray = NULL; - uint64 *valueSizeArray = NULL; - uint64 sizeArrayLength = 0; - uint32 columnCount = 0; - - protobufStripeFooter = protobuf__stripe_footer__unpack(NULL, buffer->len, - (uint8 *) buffer->data); - if (protobufStripeFooter == NULL) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid stripe footer buffer"))); - } - - columnCount = protobufStripeFooter->n_skiplistsizearray; - if (protobufStripeFooter->n_existssizearray != columnCount || - protobufStripeFooter->n_valuesizearray != columnCount) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("stripe size array lengths don't match"))); - } - - sizeArrayLength = columnCount * sizeof(uint64); - - skipListSizeArray = palloc0(sizeArrayLength); - existsSizeArray = palloc0(sizeArrayLength); - valueSizeArray = palloc0(sizeArrayLength); - - memcpy(skipListSizeArray, protobufStripeFooter->skiplistsizearray, sizeArrayLength); - memcpy(existsSizeArray, protobufStripeFooter->existssizearray, sizeArrayLength); - memcpy(valueSizeArray, protobufStripeFooter->valuesizearray, sizeArrayLength); - - protobuf__stripe_footer__free_unpacked(protobufStripeFooter, NULL); - - stripeFooter = palloc0(sizeof(StripeFooter)); - stripeFooter->skipListSizeArray = skipListSizeArray; - stripeFooter->existsSizeArray = existsSizeArray; - stripeFooter->valueSizeArray = valueSizeArray; - stripeFooter->columnCount = columnCount; - - return stripeFooter; -} - - /* * DeserializeBlockCount deserializes the given column skip list buffer and * returns the number of blocks in column skip list. diff --git a/cstore_metadata_serialization.h b/cstore_metadata_serialization.h index b8890a5d4..d5b7c90ff 100644 --- a/cstore_metadata_serialization.h +++ b/cstore_metadata_serialization.h @@ -17,7 +17,6 @@ /* Function declarations for metadata serialization */ extern StringInfo SerializePostScript(uint64 tableFooterLength); extern StringInfo SerializeTableFooter(TableFooter *tableFooter); -extern StringInfo SerializeStripeFooter(StripeFooter *stripeFooter); extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCount, bool typeByValue, int typeLength); @@ -27,7 +26,6 @@ extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength); extern TableFooter * DeserializeTableFooter(StringInfo buffer); extern uint32 DeserializeBlockCount(StringInfo buffer); extern uint32 DeserializeRowCount(StringInfo buffer); -extern StripeFooter * DeserializeStripeFooter(StringInfo buffer); extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer, bool typeByValue, int typeLength, uint32 blockCount); diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c new file mode 100644 index 000000000..e2d003989 --- /dev/null +++ b/cstore_metadata_tables.c @@ -0,0 +1,186 @@ +/*------------------------------------------------------------------------- + * + * cstore_metadata_tables.c + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" +#include "cstore.h" +#include "cstore_version_compat.h" + +#include +#include "access/nbtree.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_collation.h" +#include "commands/defrem.h" +#include "lib/stringinfo.h" +#include "optimizer/optimizer.h" +#include "port.h" +#include "storage/fd.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +#include "cstore_metadata_serialization.h" + +static Oid CStoreStripeAttrRelationId(void); +static Oid CStoreStripeAttrIndexRelationId(void); +static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, + uint64 existsSize, uint64 valuesSize, + uint64 skiplistSize); + +/* constants for cstore_stripe_attr */ +#define Natts_cstore_stripe_attr 6 +#define Anum_cstore_stripe_attr_relid 1 +#define Anum_cstore_stripe_attr_stripe 2 +#define Anum_cstore_stripe_attr_attr 3 +#define Anum_cstore_stripe_attr_exists_size 4 +#define Anum_cstore_stripe_attr_value_size 5 +#define Anum_cstore_stripe_attr_skiplist_size 6 + +/* + * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. + */ +void +SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer) +{ + for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) + { + InsertStripeAttrRow(relid, stripe, attr, + footer->existsSizeArray[attr - 1], + footer->valueSizeArray[attr - 1], + footer->skipListSizeArray[attr - 1]); + } +} + + +/* + * InsertStripeAttrRow adds a row to cstore_stripe_attr. + */ +static void +InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, + uint64 existsSize, uint64 valuesSize, + uint64 skiplistSize) +{ + bool nulls[Natts_cstore_stripe_attr] = { 0 }; + Datum values[Natts_cstore_stripe_attr] = { + ObjectIdGetDatum(relid), + Int64GetDatum(stripe), + Int16GetDatum(attr), + Int64GetDatum(existsSize), + Int64GetDatum(valuesSize), + Int64GetDatum(skiplistSize) + }; + + Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); + Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); + + HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + + CatalogTupleInsert(cstoreStripeAttrs, tuple); + + CommandCounterIncrement(); + + heap_close(cstoreStripeAttrs, NoLock); +} + + +/* + * ReadStripeFooter returns a StripeFooter by reading relevant records from + * cstore_stripe_attr. + */ +StripeFooter * +ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) +{ + StripeFooter *footer = NULL; + HeapTuple heapTuple; + + Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); + Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, AccessShareLock); + Relation index = index_open(CStoreStripeAttrIndexRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); + + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[2]; + ScanKeyInit(&scanKey[0], Anum_cstore_stripe_attr_relid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + ScanKeyInit(&scanKey[1], Anum_cstore_stripe_attr_stripe, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(stripe)); + + scanDescriptor = systable_beginscan_ordered(cstoreStripeAttrs, index, NULL, 2, + scanKey); + + footer = palloc0(sizeof(StripeFooter)); + footer->existsSizeArray = palloc0(relationColumnCount * sizeof(int64)); + footer->valueSizeArray = palloc0(relationColumnCount * sizeof(int64)); + footer->skipListSizeArray = palloc0(relationColumnCount * sizeof(int64)); + + /* + * Stripe can have less columns than the relation if ALTER TABLE happens + * after stripe is formed. So we calculate column count of a stripe as + * maximum attribute number for that stripe. + */ + footer->columnCount = 0; + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum datumArray[Natts_cstore_stripe_attr]; + bool isNullArray[Natts_cstore_stripe_attr]; + AttrNumber attr = 0; + + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + attr = DatumGetInt16(datumArray[2]); + + footer->columnCount = Max(footer->columnCount, attr); + + while (attr > relationColumnCount) + { + ereport(ERROR, (errmsg("unexpected attribute %d for a relation with %d attrs", + attr, relationColumnCount))); + } + + footer->existsSizeArray[attr - 1] = + DatumGetInt64(datumArray[Anum_cstore_stripe_attr_exists_size - 1]); + footer->valueSizeArray[attr - 1] = + DatumGetInt64(datumArray[Anum_cstore_stripe_attr_value_size - 1]); + footer->skipListSizeArray[attr - 1] = + DatumGetInt64(datumArray[Anum_cstore_stripe_attr_skiplist_size - 1]); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, NoLock); + heap_close(cstoreStripeAttrs, NoLock); + + return footer; +} + + +/* + * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreStripeAttrRelationId(void) +{ + return get_relname_relid("cstore_stripe_attr", PG_CATALOG_NAMESPACE); +} + + +/* + * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_idx. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreStripeAttrIndexRelationId(void) +{ + return get_relname_relid("cstore_stripe_attr_idx", PG_CATALOG_NAMESPACE); +} diff --git a/cstore_reader.c b/cstore_reader.c index 68ce5cdad..6caf99bc7 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -31,6 +31,7 @@ #include "storage/fd.h" #include "utils/memutils.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "cstore.h" #include "cstore_metadata_serialization.h" @@ -39,6 +40,7 @@ /* static function declarations */ static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, + StripeFooter *stripeFooter, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList); @@ -51,8 +53,6 @@ static ColumnBuffers * LoadColumnBuffers(FILE *tableFile, uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset, Form_pg_attribute attributeForm); -static StripeFooter * LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata, - uint32 columnCount); static StripeSkipList * LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, StripeFooter *stripeFooter, @@ -86,7 +86,8 @@ static int64 FILESize(FILE *file); static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size); static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount); -static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata); +static uint64 StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata); +static int RelationColumnCount(Oid relid); /* @@ -94,7 +95,7 @@ static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata); * read handle that's used during reading rows and finishing the read operation. */ TableReadState * -CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, +CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList) { TableReadState *readState = NULL; @@ -136,6 +137,7 @@ CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, tableFooter->blockRowCount); readState = palloc0(sizeof(TableReadState)); + readState->relationId = relationId; readState->tableFile = tableFile; readState->tableFooter = tableFooter; readState->projectedColumnList = projectedColumnList; @@ -247,6 +249,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu StripeMetadata *stripeMetadata = NULL; List *stripeMetadataList = tableFooter->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); + StripeFooter *stripeFooter = NULL; /* if we have read all stripes, return false */ if (readState->readStripeCount == stripeCount) @@ -258,7 +261,11 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu MemoryContextReset(readState->stripeReadContext); stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); + stripeFooter = ReadStripeFooter(readState->relationId, + stripeMetadata->id, + readState->tupleDescriptor->natts); stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata, + stripeFooter, readState->tupleDescriptor, readState->projectedColumnList, readState->whereClauseList); @@ -396,7 +403,7 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) /* CStoreTableRowCount returns the exact row count of a table using skiplists */ uint64 -CStoreTableRowCount(const char *filename) +CStoreTableRowCount(Oid relid, const char *filename) { TableFooter *tableFooter = NULL; FILE *tableFile; @@ -422,7 +429,7 @@ CStoreTableRowCount(const char *filename) foreach(stripeMetadataCell, tableFooter->stripeMetadataList) { StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); - totalRowCount += StripeRowCount(tableFile, stripeMetadata); + totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata); } FreeFile(tableFile); @@ -436,20 +443,13 @@ CStoreTableRowCount(const char *filename) * skip list, and returns number of rows for given stripe. */ static uint64 -StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata) +StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata) { uint64 rowCount = 0; - StripeFooter *stripeFooter = NULL; - StringInfo footerBuffer = NULL; StringInfo firstColumnSkipListBuffer = NULL; - uint64 footerOffset = 0; - footerOffset += stripeMetadata->fileOffset; - footerOffset += stripeMetadata->skipListLength; - footerOffset += stripeMetadata->dataLength; - - footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength); - stripeFooter = DeserializeStripeFooter(footerBuffer); + StripeFooter * stripeFooter = ReadStripeFooter(relid, stripeMetadata->id, + RelationColumnCount(relid)); firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, stripeFooter->skipListSizeArray[0]); @@ -466,8 +466,8 @@ StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata) */ static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, - TupleDesc tupleDescriptor, List *projectedColumnList, - List *whereClauseList) + StripeFooter *stripeFooter, TupleDesc tupleDescriptor, + List *projectedColumnList, List *whereClauseList) { StripeBuffers *stripeBuffers = NULL; ColumnBuffers **columnBuffersArray = NULL; @@ -475,8 +475,6 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, uint32 columnIndex = 0; uint32 columnCount = tupleDescriptor->natts; - StripeFooter *stripeFooter = LoadStripeFooter(tableFile, stripeMetadata, - columnCount); bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata, @@ -617,31 +615,6 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray, } -/* Reads and returns the given stripe's footer. */ -static StripeFooter * -LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata, - uint32 columnCount) -{ - StripeFooter *stripeFooter = NULL; - StringInfo footerBuffer = NULL; - uint64 footerOffset = 0; - - footerOffset += stripeMetadata->fileOffset; - footerOffset += stripeMetadata->skipListLength; - footerOffset += stripeMetadata->dataLength; - - footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength); - stripeFooter = DeserializeStripeFooter(footerBuffer); - if (stripeFooter->columnCount > columnCount) - { - ereport(ERROR, (errmsg("stripe footer column count and table column count " - "don't match"))); - } - - return stripeFooter; -} - - /* Reads the skip list for the given stripe. */ static StripeSkipList * LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, @@ -1377,3 +1350,15 @@ ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount) } } } + + +static int +RelationColumnCount(Oid relid) +{ + Relation rel = RelationIdGetRelation(relid); + TupleDesc tupleDesc = RelationGetDescr(rel); + int columnCount = tupleDesc->natts; + RelationClose(rel); + + return columnCount; +} diff --git a/cstore_writer.c b/cstore_writer.c index 51a01c8f3..76fc703f3 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -65,7 +65,8 @@ static StringInfo CopyStringInfo(StringInfo sourceString); * will be added. */ TableWriteState * -CStoreBeginWrite(const char *filename, CompressionType compressionType, +CStoreBeginWrite(Oid relationId, + const char *filename, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, TupleDesc tupleDescriptor) { @@ -82,6 +83,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType, int statResult = 0; bool *columnMaskArray = NULL; ColumnBlockData **blockData = NULL; + uint64 currentStripeId = 0; tableFooterFilename = makeStringInfo(); appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); @@ -130,6 +132,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType, lastStripeSize += lastStripe->footerLength; currentFileOffset = lastStripe->fileOffset + lastStripeSize; + currentStripeId = lastStripe->id + 1; errno = 0; fseekResult = fseeko(tableFile, currentFileOffset, SEEK_SET); @@ -173,6 +176,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType, blockData = CreateEmptyBlockDataArray(columnCount, columnMaskArray, blockRowCount); writeState = palloc0(sizeof(TableWriteState)); + writeState->relationId = relationId; writeState->tableFile = tableFile; writeState->tableFooterFilename = tableFooterFilename; writeState->tableFooter = tableFooter; @@ -186,6 +190,7 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType, writeState->stripeWriteContext = stripeWriteContext; writeState->blockDataArray = blockData; writeState->compressionBuffer = NULL; + writeState->currentStripeId = currentStripeId; return writeState; } @@ -286,6 +291,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeMetadata stripeMetadata = FlushStripe(writeState); MemoryContextReset(writeState->stripeWriteContext); + writeState->currentStripeId++; + /* set stripe data and skip list to NULL so they are recreated next time */ writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; @@ -490,7 +497,6 @@ FlushStripe(TableWriteState *writeState) uint64 dataLength = 0; StringInfo *skipListBufferArray = NULL; StripeFooter *stripeFooter = NULL; - StringInfo stripeFooterBuffer = NULL; uint32 columnIndex = 0; uint32 blockIndex = 0; TableFooter *tableFooter = writeState->tableFooter; @@ -545,7 +551,6 @@ FlushStripe(TableWriteState *writeState) /* create skip list and footer buffers */ skipListBufferArray = CreateSkipListBufferArray(stripeSkipList, tupleDescriptor); stripeFooter = CreateStripeFooter(stripeSkipList, skipListBufferArray); - stripeFooterBuffer = SerializeStripeFooter(stripeFooter); /* * Each stripe has three sections: @@ -594,7 +599,9 @@ FlushStripe(TableWriteState *writeState) } /* finally, we flush the footer buffer */ - WriteToFile(tableFile, stripeFooterBuffer->data, stripeFooterBuffer->len); + SaveStripeFooter(writeState->relationId, + writeState->currentStripeId, + stripeFooter); /* set stripe metadata */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) @@ -607,12 +614,12 @@ FlushStripe(TableWriteState *writeState) stripeMetadata.fileOffset = writeState->currentFileOffset; stripeMetadata.skipListLength = skipListLength; stripeMetadata.dataLength = dataLength; - stripeMetadata.footerLength = stripeFooterBuffer->len; + stripeMetadata.footerLength = 0; + stripeMetadata.id = writeState->currentStripeId; /* advance current file offset */ writeState->currentFileOffset += skipListLength; writeState->currentFileOffset += dataLength; - writeState->currentFileOffset += stripeFooterBuffer->len; return stripeMetadata; }