From 9e247cdf40200cc85c813d3774575bb63829d886 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 7 Sep 2020 21:51:28 -0700 Subject: [PATCH 1/4] Move table footer to metadata tables --- cstore.c | 15 +- cstore.h | 18 +- cstore.proto | 22 --- cstore_fdw--1.7.sql | 30 ++- cstore_fdw.c | 17 +- cstore_metadata_serialization.c | 198 ------------------- cstore_metadata_serialization.h | 3 - cstore_metadata_tables.c | 332 +++++++++++++++++++++++++++++++- cstore_reader.c | 142 ++------------ cstore_writer.c | 139 +++---------- expected/truncate.out | 4 +- 11 files changed, 407 insertions(+), 513 deletions(-) diff --git a/cstore.c b/cstore.c index a259f0430..658c15745 100644 --- a/cstore.c +++ b/cstore.c @@ -130,6 +130,8 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto TableWriteState *writeState = NULL; TupleDesc tupleDescriptor = RelationGetDescr(relation); + InitCStoreTableMetadata(relationId, cstoreOptions->blockRowCount); + /* * Initialize state to write to the cstore file. This creates an * empty data file and a valid footer file for the table. @@ -183,19 +185,6 @@ void DeleteCStoreTableFiles(char *filename) { int dataFileRemoved = 0; - int footerFileRemoved = 0; - - StringInfo tableFooterFilename = makeStringInfo(); - appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); - - /* delete the footer file */ - footerFileRemoved = unlink(tableFooterFilename->data); - if (footerFileRemoved != 0) - { - ereport(WARNING, (errcode_for_file_access(), - errmsg("could not delete file \"%s\": %m", - tableFooterFilename->data))); - } /* delete the data file */ dataFileRemoved = unlink(filename); diff --git a/cstore.h b/cstore.h index ed850d9ef..20cac7e05 100644 --- a/cstore.h +++ b/cstore.h @@ -46,8 +46,6 @@ /* miscellaneous defines */ #define CSTORE_FDW_NAME "cstore_fdw" -#define CSTORE_FOOTER_FILE_SUFFIX ".footer" -#define CSTORE_TEMP_FILE_SUFFIX ".tmp" #define CSTORE_TUPLE_COST_MULTIPLIER 10 #define CSTORE_POSTSCRIPT_SIZE_LENGTH 1 #define CSTORE_POSTSCRIPT_SIZE_MAX 256 @@ -91,12 +89,12 @@ typedef struct StripeMetadata } StripeMetadata; -/* TableFooter represents the footer of a cstore file. */ -typedef struct TableFooter +/* TableMetadata represents the metadata of a cstore file. */ +typedef struct TableMetadata { List *stripeMetadataList; uint64 blockRowCount; -} TableFooter; +} TableMetadata; /* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */ @@ -206,7 +204,7 @@ typedef struct TableReadState Oid relationId; FILE *tableFile; - TableFooter *tableFooter; + TableMetadata *tableMetadata; TupleDesc tupleDescriptor; /* @@ -231,8 +229,7 @@ typedef struct TableWriteState { Oid relationId; FILE *tableFile; - TableFooter *tableFooter; - StringInfo tableFooterFilename; + TableMetadata *tableMetadata; CompressionType compressionType; TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; @@ -277,7 +274,6 @@ extern void CStoreEndWrite(TableWriteState *state); extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, List *projectedColumnList, List *qualConditions); -extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename); extern bool CStoreReadFinished(TableReadState *state); extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues, bool *columnNulls); @@ -298,6 +294,8 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio /* cstore_metadata_tables.c */ extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer); extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount); - +extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); +extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); +extern TableMetadata * ReadTableMetadata(Oid relid); #endif /* CSTORE_H */ diff --git a/cstore.proto b/cstore.proto index ea949c77c..a7525b633 100644 --- a/cstore.proto +++ b/cstore.proto @@ -22,25 +22,3 @@ message ColumnBlockSkipNode { message ColumnBlockSkipList { repeated ColumnBlockSkipNode blockSkipNodeArray = 1; } - -message StripeMetadata { - optional uint64 fileOffset = 1; - optional uint64 skipListLength = 2; - optional uint64 dataLength = 3; - optional uint64 footerLength = 4; - optional uint64 id = 5; -} - -message TableFooter { - repeated StripeMetadata stripeMetadataArray = 1; - optional uint32 blockRowCount = 2; -} - -message PostScript { - optional uint64 tableFooterLength = 1; - optional uint64 versionMajor = 2; - optional uint64 versionMinor = 3; - - // Leave this last in the record - optional string magicNumber = 8000; -} diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index fd526e711..86589ca90 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -58,17 +58,37 @@ CREATE EVENT TRIGGER cstore_drop_event ON SQL_DROP EXECUTE PROCEDURE cstore_drop_trigger(); +CREATE TABLE cstore_tables ( + relid oid, + block_row_count int, + version_major bigint, + version_minor bigint, + PRIMARY KEY (relid) +) WITH (user_catalog_table = true); + +ALTER TABLE cstore_tables SET SCHEMA pg_catalog; + +CREATE TABLE cstore_stripes ( + relid oid, + stripe bigint, + file_offset bigint, + skiplist_length bigint, + data_length bigint, + PRIMARY KEY (relid, stripe), + FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE +) WITH (user_catalog_table = true); + +ALTER TABLE cstore_stripes SET SCHEMA pg_catalog; + CREATE TABLE cstore_stripe_attr ( relid oid, stripe bigint, attr int, exists_size bigint, value_size bigint, - skiplist_size bigint + skiplist_size bigint, + PRIMARY KEY (relid, stripe, attr), + FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE ) WITH (user_catalog_table = true); -CREATE INDEX cstore_stripe_attr_idx - ON cstore_stripe_attr - USING BTREE(relid, stripe, attr); - ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog; diff --git a/cstore_fdw.c b/cstore_fdw.c index 7d43c07d5..8ce3a7296 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -426,6 +426,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString, RemoveCStoreDatabaseDirectory(databaseOid); } } + /* handle other utility statements */ else { @@ -1026,11 +1027,8 @@ cstore_table_size(PG_FUNCTION_ARGS) int64 tableSize = 0; CStoreOptions *cstoreOptions = NULL; char *dataFilename = NULL; - StringInfo footerFilename = NULL; int dataFileStatResult = 0; - int footerFileStatResult = 0; struct stat dataFileStatBuffer; - struct stat footerFileStatBuffer; bool cstoreTable = CStoreTable(relationId); if (!cstoreTable) @@ -1048,20 +1046,7 @@ cstore_table_size(PG_FUNCTION_ARGS) errmsg("could not stat file \"%s\": %m", dataFilename))); } - footerFilename = makeStringInfo(); - appendStringInfo(footerFilename, "%s%s", dataFilename, - CSTORE_FOOTER_FILE_SUFFIX); - - footerFileStatResult = stat(footerFilename->data, &footerFileStatBuffer); - if (footerFileStatResult != 0) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - footerFilename->data))); - } - tableSize += dataFileStatBuffer.st_size; - tableSize += footerFileStatBuffer.st_size; PG_RETURN_INT64(tableSize); } diff --git a/cstore_metadata_serialization.c b/cstore_metadata_serialization.c index 09c17ee7f..2b06d4a15 100644 --- a/cstore_metadata_serialization.c +++ b/cstore_metadata_serialization.c @@ -28,98 +28,6 @@ static Datum ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary, bool typeByValue, int typeLength); -/* - * SerializePostScript serializes the given postscript and returns the result as - * a StringInfo. - */ -StringInfo -SerializePostScript(uint64 tableFooterLength) -{ - StringInfo postscriptBuffer = NULL; - Protobuf__PostScript protobufPostScript = PROTOBUF__POST_SCRIPT__INIT; - uint8 *postscriptData = NULL; - uint32 postscriptSize = 0; - - protobufPostScript.has_tablefooterlength = true; - protobufPostScript.tablefooterlength = tableFooterLength; - protobufPostScript.has_versionmajor = true; - protobufPostScript.versionmajor = CSTORE_VERSION_MAJOR; - protobufPostScript.has_versionminor = true; - protobufPostScript.versionminor = CSTORE_VERSION_MINOR; - protobufPostScript.magicnumber = pstrdup(CSTORE_MAGIC_NUMBER); - - postscriptSize = protobuf__post_script__get_packed_size(&protobufPostScript); - postscriptData = palloc0(postscriptSize); - protobuf__post_script__pack(&protobufPostScript, postscriptData); - - postscriptBuffer = palloc0(sizeof(StringInfoData)); - postscriptBuffer->len = postscriptSize; - postscriptBuffer->maxlen = postscriptSize; - postscriptBuffer->data = (char *) postscriptData; - - return postscriptBuffer; -} - - -/* - * SerializeTableFooter serializes the given table footer and returns the result - * as a StringInfo. - */ -StringInfo -SerializeTableFooter(TableFooter *tableFooter) -{ - StringInfo tableFooterBuffer = NULL; - Protobuf__TableFooter protobufTableFooter = PROTOBUF__TABLE_FOOTER__INIT; - Protobuf__StripeMetadata **stripeMetadataArray = NULL; - ListCell *stripeMetadataCell = NULL; - uint8 *tableFooterData = NULL; - uint32 tableFooterSize = 0; - uint32 stripeIndex = 0; - - List *stripeMetadataList = tableFooter->stripeMetadataList; - uint32 stripeCount = list_length(stripeMetadataList); - stripeMetadataArray = palloc0(stripeCount * sizeof(Protobuf__StripeMetadata *)); - - foreach(stripeMetadataCell, stripeMetadataList) - { - StripeMetadata *stripeMetadata = lfirst(stripeMetadataCell); - - Protobuf__StripeMetadata *protobufStripeMetadata = NULL; - protobufStripeMetadata = palloc0(sizeof(Protobuf__StripeMetadata)); - protobuf__stripe_metadata__init(protobufStripeMetadata); - protobufStripeMetadata->has_fileoffset = true; - protobufStripeMetadata->fileoffset = stripeMetadata->fileOffset; - protobufStripeMetadata->has_skiplistlength = true; - protobufStripeMetadata->skiplistlength = stripeMetadata->skipListLength; - protobufStripeMetadata->has_datalength = true; - protobufStripeMetadata->datalength = stripeMetadata->dataLength; - protobufStripeMetadata->has_footerlength = true; - protobufStripeMetadata->footerlength = stripeMetadata->footerLength; - protobufStripeMetadata->has_id = true; - protobufStripeMetadata->id = stripeMetadata->id; - - stripeMetadataArray[stripeIndex] = protobufStripeMetadata; - stripeIndex++; - } - - protobufTableFooter.n_stripemetadataarray = stripeCount; - protobufTableFooter.stripemetadataarray = stripeMetadataArray; - protobufTableFooter.has_blockrowcount = true; - protobufTableFooter.blockrowcount = tableFooter->blockRowCount; - - tableFooterSize = protobuf__table_footer__get_packed_size(&protobufTableFooter); - tableFooterData = palloc0(tableFooterSize); - protobuf__table_footer__pack(&protobufTableFooter, tableFooterData); - - tableFooterBuffer = palloc0(sizeof(StringInfoData)); - tableFooterBuffer->len = tableFooterSize; - tableFooterBuffer->maxlen = tableFooterSize; - tableFooterBuffer->data = (char *) tableFooterData; - - return tableFooterBuffer; -} - - /* * SerializeColumnSkipList serializes a column skip list, where the colum skip * list includes all block skip nodes for that column. The function then returns @@ -194,112 +102,6 @@ SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCou } -/* - * DeserializePostScript deserializes the given postscript buffer and returns - * the size of table footer in tableFooterLength pointer. - */ -void -DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength) -{ - Protobuf__PostScript *protobufPostScript = NULL; - protobufPostScript = protobuf__post_script__unpack(NULL, buffer->len, - (uint8 *) buffer->data); - if (protobufPostScript == NULL) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid postscript buffer"))); - } - - if (protobufPostScript->versionmajor != CSTORE_VERSION_MAJOR || - protobufPostScript->versionminor > CSTORE_VERSION_MINOR) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid column store version number"))); - } - else if (strncmp(protobufPostScript->magicnumber, CSTORE_MAGIC_NUMBER, - NAMEDATALEN) != 0) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid magic number"))); - } - - (*tableFooterLength) = protobufPostScript->tablefooterlength; - - protobuf__post_script__free_unpacked(protobufPostScript, NULL); -} - - -/* - * DeserializeTableFooter deserializes the given buffer and returns the result as - * a TableFooter struct. - */ -TableFooter * -DeserializeTableFooter(StringInfo buffer) -{ - TableFooter *tableFooter = NULL; - Protobuf__TableFooter *protobufTableFooter = NULL; - List *stripeMetadataList = NIL; - uint64 blockRowCount = 0; - uint32 stripeCount = 0; - uint32 stripeIndex = 0; - - protobufTableFooter = protobuf__table_footer__unpack(NULL, buffer->len, - (uint8 *) buffer->data); - if (protobufTableFooter == NULL) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid table footer buffer"))); - } - - if (!protobufTableFooter->has_blockrowcount) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("missing required table footer metadata fields"))); - } - else if (protobufTableFooter->blockrowcount < BLOCK_ROW_COUNT_MINIMUM || - protobufTableFooter->blockrowcount > BLOCK_ROW_COUNT_MAXIMUM) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("invalid block row count"))); - } - blockRowCount = protobufTableFooter->blockrowcount; - - stripeCount = protobufTableFooter->n_stripemetadataarray; - for (stripeIndex = 0; stripeIndex < stripeCount; stripeIndex++) - { - StripeMetadata *stripeMetadata = NULL; - Protobuf__StripeMetadata *protobufStripeMetadata = NULL; - - protobufStripeMetadata = protobufTableFooter->stripemetadataarray[stripeIndex]; - if (!protobufStripeMetadata->has_fileoffset || - !protobufStripeMetadata->has_skiplistlength || - !protobufStripeMetadata->has_datalength || - !protobufStripeMetadata->has_footerlength) - { - ereport(ERROR, (errmsg("could not unpack column store"), - errdetail("missing required stripe metadata fields"))); - } - - stripeMetadata = palloc0(sizeof(StripeMetadata)); - stripeMetadata->fileOffset = protobufStripeMetadata->fileoffset; - stripeMetadata->skipListLength = protobufStripeMetadata->skiplistlength; - stripeMetadata->dataLength = protobufStripeMetadata->datalength; - stripeMetadata->footerLength = protobufStripeMetadata->footerlength; - stripeMetadata->id = protobufStripeMetadata->id; - - stripeMetadataList = lappend(stripeMetadataList, stripeMetadata); - } - - protobuf__table_footer__free_unpacked(protobufTableFooter, NULL); - - tableFooter = palloc0(sizeof(TableFooter)); - tableFooter->stripeMetadataList = stripeMetadataList; - tableFooter->blockRowCount = blockRowCount; - - return tableFooter; -} - - /* * DeserializeBlockCount deserializes the given column skip list buffer and * returns the number of blocks in column skip list. diff --git a/cstore_metadata_serialization.h b/cstore_metadata_serialization.h index 12a3d135b..efd27000a 100644 --- a/cstore_metadata_serialization.h +++ b/cstore_metadata_serialization.h @@ -15,15 +15,12 @@ #define CSTORE_SERIALIZATION_H /* Function declarations for metadata serialization */ -extern StringInfo SerializePostScript(uint64 tableFooterLength); -extern StringInfo SerializeTableFooter(TableFooter *tableFooter); extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCount, bool typeByValue, int typeLength); /* Function declarations for metadata deserialization */ extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength); -extern TableFooter * DeserializeTableFooter(StringInfo buffer); extern uint32 DeserializeBlockCount(StringInfo buffer); extern uint32 DeserializeRowCount(StringInfo buffer); extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer, diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index e2d003989..f5168de1e 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -13,13 +13,21 @@ #include "cstore_version_compat.h" #include +#include "access/heapam.h" #include "access/nbtree.h" #include "access/table.h" +#include "access/tableam.h" #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/pg_namespace.h" #include "catalog/pg_collation.h" +#include "catalog/pg_type.h" #include "commands/defrem.h" +#include "commands/trigger.h" +#include "executor/executor.h" +#include "executor/spi.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" #include "lib/stringinfo.h" #include "optimizer/optimizer.h" #include "port.h" @@ -33,9 +41,16 @@ static Oid CStoreStripeAttrRelationId(void); static Oid CStoreStripeAttrIndexRelationId(void); +static Oid CStoreStripesRelationId(void); +static Oid CStoreStripesIndexRelationId(void); +static Oid CStoreTablesRelationId(void); +static Oid CStoreTablesIndexRelationId(void); static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, uint64 existsSize, uint64 valuesSize, uint64 skiplistSize); +static int TableBlockRowCount(Oid relid); +static void DeleteTableMetadataRowIfExists(Oid relid); +static EState * create_estate_for_relation(Relation rel); /* constants for cstore_stripe_attr */ #define Natts_cstore_stripe_attr 6 @@ -46,6 +61,275 @@ static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, #define Anum_cstore_stripe_attr_value_size 5 #define Anum_cstore_stripe_attr_skiplist_size 6 +/* constants for cstore_table */ +#define Natts_cstore_tables 4 +#define Anum_cstore_tables_relid 1 +#define Anum_cstore_tables_block_row_count 2 +#define Anum_cstore_tables_version_major 3 +#define Anum_cstore_tables_version_minor 4 + +/* constants for cstore_stripe */ +#define Natts_cstore_stripes 5 +#define Anum_cstore_stripes_relid 1 +#define Anum_cstore_stripes_stripe 2 +#define Anum_cstore_stripes_file_offset 3 +#define Anum_cstore_stripes_skiplist_length 4 +#define Anum_cstore_stripes_data_length 5 + +/* + * InitCStoreTableMetadata adds a record for the given relation in cstore_table. + */ +void +InitCStoreTableMetadata(Oid relid, int blockRowCount) +{ + Oid cstoreTableOid = InvalidOid; + Relation cstoreTable = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple tuple = NULL; + + bool nulls[Natts_cstore_tables] = { 0 }; + Datum values[Natts_cstore_tables] = { + ObjectIdGetDatum(relid), + Int32GetDatum(blockRowCount), + Int32GetDatum(CSTORE_VERSION_MAJOR), + Int32GetDatum(CSTORE_VERSION_MINOR) + }; + + DeleteTableMetadataRowIfExists(relid); + + cstoreTableOid = CStoreTablesRelationId(); + cstoreTable = heap_open(cstoreTableOid, RowExclusiveLock); + tupleDescriptor = RelationGetDescr(cstoreTable); + + tuple = heap_form_tuple(tupleDescriptor, values, nulls); + + CatalogTupleInsert(cstoreTable, tuple); + + CommandCounterIncrement(); + + heap_close(cstoreTable, NoLock); +} + + +/* + * InsertStripeMetadataRow adds a row to cstore_stripes. + */ +void +InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) +{ + bool nulls[Natts_cstore_stripes] = { 0 }; + Datum values[Natts_cstore_stripes] = { + ObjectIdGetDatum(relid), + Int64GetDatum(stripe->id), + Int64GetDatum(stripe->fileOffset), + Int64GetDatum(stripe->skipListLength), + Int64GetDatum(stripe->dataLength) + }; + + Oid cstoreStripesOid = CStoreStripesRelationId(); + Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripes); + + HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + + CatalogTupleInsert(cstoreStripes, tuple); + + CommandCounterIncrement(); + + heap_close(cstoreStripes, NoLock); +} + + +/* + * ReadTableMetadata constructs TableMetadata for a given relid by reading + * from cstore_tables and cstore_stripes. + */ +TableMetadata * +ReadTableMetadata(Oid relid) +{ + Oid cstoreStripesOid = InvalidOid; + Relation cstoreStripes = NULL; + Relation index = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[1]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple; + + TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata)); + tableMetadata->blockRowCount = TableBlockRowCount(relid); + + ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + + cstoreStripesOid = CStoreStripesRelationId(); + cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock); + index = index_open(CStoreStripesIndexRelationId(), AccessShareLock); + tupleDescriptor = RelationGetDescr(cstoreStripes); + + scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, NULL, 1, scanKey); + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + StripeMetadata *stripeMetadata = NULL; + Datum datumArray[Natts_cstore_stripes]; + bool isNullArray[Natts_cstore_stripes]; + + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + stripeMetadata = palloc0(sizeof(StripeMetadata)); + stripeMetadata->id = DatumGetInt64(datumArray[Anum_cstore_stripes_stripe - 1]); + stripeMetadata->fileOffset = DatumGetInt64( + datumArray[Anum_cstore_stripes_file_offset - 1]); + stripeMetadata->dataLength = DatumGetInt64( + datumArray[Anum_cstore_stripes_data_length - 1]); + stripeMetadata->skipListLength = DatumGetInt64( + datumArray[Anum_cstore_stripes_skiplist_length - 1]); + + tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList, + stripeMetadata); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, NoLock); + heap_close(cstoreStripes, NoLock); + + return tableMetadata; +} + + +/* + * TableBlockRowCount returns block_row_count column from cstore_tables for a given relid. + */ +static int +TableBlockRowCount(Oid relid) +{ + int blockRowCount = 0; + Oid cstoreTablesOid = InvalidOid; + Relation cstoreTables = NULL; + Relation index = NULL; + TupleDesc tupleDescriptor = NULL; + ScanKeyData scanKey[1]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + + ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + + cstoreTablesOid = CStoreTablesRelationId(); + cstoreTables = heap_open(cstoreTablesOid, AccessShareLock); + index = index_open(CStoreTablesIndexRelationId(), AccessShareLock); + tupleDescriptor = RelationGetDescr(cstoreTables); + + scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + Datum datumArray[Natts_cstore_tables]; + bool isNullArray[Natts_cstore_tables]; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - 1]); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, NoLock); + heap_close(cstoreTables, NoLock); + + return blockRowCount; +} + + +/* + * DeleteTableMetadataRowIfExists removes the row with given relid from cstore_stripes. + */ +static void +DeleteTableMetadataRowIfExists(Oid relid) +{ + Oid cstoreTablesOid = InvalidOid; + Relation cstoreTables = NULL; + Relation index = NULL; + ScanKeyData scanKey[1]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + + ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid)); + + cstoreTablesOid = CStoreTablesRelationId(); + cstoreTables = table_open(cstoreTablesOid, AccessShareLock); + index = index_open(CStoreTablesIndexRelationId(), AccessShareLock); + + scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + EState *estate = create_estate_for_relation(cstoreTables); + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + ItemPointer tid = &(heapTuple->t_self); + simple_table_tuple_delete(cstoreTables, tid, estate->es_snapshot); + + /* + * Execute AFTER ROW DELETE Triggers to enforce foreign key + * constraints. + */ + ExecARDeleteTriggers(estate, resultRelInfo, + tid, NULL, NULL); + + AfterTriggerEndQuery(estate); + ExecCleanUpTriggerState(estate); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, NoLock); + table_close(cstoreTables, NoLock); +} + + +/* + * Based on a similar function from + * postgres/src/backend/replication/logical/worker.c. + * + * Executor state preparation for evaluation of constraint expressions, + * indexes and triggers. + * + * This is based on similar code in copy.c + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + ResultRelInfo *resultRelInfo; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + return estate; +} + + /* * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. */ @@ -176,11 +460,55 @@ CStoreStripeAttrRelationId(void) /* - * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_idx. + * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_pkey. * TODO: should we cache this similar to citus? */ static Oid CStoreStripeAttrIndexRelationId(void) { - return get_relname_relid("cstore_stripe_attr_idx", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_stripe_attr_pkey", PG_CATALOG_NAMESPACE); +} + + +/* + * CStoreStripesRelationId returns relation id of cstore_stripes. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreStripesRelationId(void) +{ + return get_relname_relid("cstore_stripes", PG_CATALOG_NAMESPACE); +} + + +/* + * CStoreStripesIndexRelationId returns relation id of cstore_stripes_idx. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreStripesIndexRelationId(void) +{ + return get_relname_relid("cstore_stripes_pkey", PG_CATALOG_NAMESPACE); +} + + +/* + * CStoreTablesRelationId returns relation id of cstore_tables. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreTablesRelationId(void) +{ + return get_relname_relid("cstore_tables", PG_CATALOG_NAMESPACE); +} + + +/* + * CStoreTablesIndexRelationId returns relation id of cstore_tables_idx. + * TODO: should we cache this similar to citus? + */ +static Oid +CStoreTablesIndexRelationId(void) +{ + return get_relname_relid("cstore_tables_pkey", PG_CATALOG_NAMESPACE); } diff --git a/cstore_reader.c b/cstore_reader.c index 78c7fe00e..ddef3395a 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -82,7 +82,6 @@ static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex TupleDesc tupleDescriptor); static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm); -static int64 FILESize(FILE *file); static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size); static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount); @@ -99,20 +98,14 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList) { TableReadState *readState = NULL; - TableFooter *tableFooter = NULL; + TableMetadata *tableMetadata = NULL; FILE *tableFile = NULL; MemoryContext stripeReadContext = NULL; uint32 columnCount = 0; bool *projectedColumnMask = NULL; ColumnBlockData **blockDataArray = NULL; - StringInfo tableFooterFilename = makeStringInfo(); - appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); - - tableFooter = CStoreReadFooter(tableFooterFilename); - - pfree(tableFooterFilename->data); - pfree(tableFooterFilename); + tableMetadata = ReadTableMetadata(relationId); tableFile = AllocateFile(filename, PG_BINARY_R); if (tableFile == NULL) @@ -134,12 +127,12 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, columnCount = tupleDescriptor->natts; projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask, - tableFooter->blockRowCount); + tableMetadata->blockRowCount); readState = palloc0(sizeof(TableReadState)); readState->relationId = relationId; readState->tableFile = tableFile; - readState->tableFooter = tableFooter; + readState->tableMetadata = tableMetadata; readState->projectedColumnList = projectedColumnList; readState->whereClauseList = whereClauseList; readState->stripeBuffers = NULL; @@ -154,76 +147,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, } -/* - * CStoreReadFooter reads the cstore file footer from the given file. First, the - * function reads the last byte of the file as the postscript size. Then, the - * function reads the postscript. Last, the function reads and deserializes the - * footer. - */ -TableFooter * -CStoreReadFooter(StringInfo tableFooterFilename) -{ - TableFooter *tableFooter = NULL; - FILE *tableFooterFile = NULL; - uint64 footerOffset = 0; - uint64 footerLength = 0; - StringInfo postscriptBuffer = NULL; - StringInfo postscriptSizeBuffer = NULL; - uint64 postscriptSizeOffset = 0; - uint8 postscriptSize = 0; - uint64 footerFileSize = 0; - uint64 postscriptOffset = 0; - StringInfo footerBuffer = NULL; - int freeResult = 0; - - tableFooterFile = AllocateFile(tableFooterFilename->data, PG_BINARY_R); - if (tableFooterFile == NULL) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - tableFooterFilename->data), - errhint("Try copying in data to the table."))); - } - - footerFileSize = FILESize(tableFooterFile); - if (footerFileSize < CSTORE_POSTSCRIPT_SIZE_LENGTH) - { - ereport(ERROR, (errmsg("invalid cstore file"))); - } - - postscriptSizeOffset = footerFileSize - CSTORE_POSTSCRIPT_SIZE_LENGTH; - postscriptSizeBuffer = ReadFromFile(tableFooterFile, postscriptSizeOffset, - CSTORE_POSTSCRIPT_SIZE_LENGTH); - memcpy(&postscriptSize, postscriptSizeBuffer->data, CSTORE_POSTSCRIPT_SIZE_LENGTH); - if (postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize) - { - ereport(ERROR, (errmsg("invalid postscript size"))); - } - - postscriptOffset = footerFileSize - (CSTORE_POSTSCRIPT_SIZE_LENGTH + postscriptSize); - postscriptBuffer = ReadFromFile(tableFooterFile, postscriptOffset, postscriptSize); - - DeserializePostScript(postscriptBuffer, &footerLength); - if (footerLength + postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize) - { - ereport(ERROR, (errmsg("invalid footer size"))); - } - - footerOffset = postscriptOffset - footerLength; - footerBuffer = ReadFromFile(tableFooterFile, footerOffset, footerLength); - tableFooter = DeserializeTableFooter(footerBuffer); - - freeResult = FreeFile(tableFooterFile); - if (freeResult != 0) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not close file: %m"))); - } - - return tableFooter; -} - - /* * CStoreReadNextRow tries to read a row from the cstore file. On success, it sets * column values and nulls, and returns true. If there are no more rows to read, @@ -234,7 +157,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { uint32 blockIndex = 0; uint32 blockRowIndex = 0; - TableFooter *tableFooter = readState->tableFooter; + TableMetadata *tableMetadata = readState->tableMetadata; MemoryContext oldContext = NULL; /* @@ -247,7 +170,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu { StripeBuffers *stripeBuffers = NULL; StripeMetadata *stripeMetadata = NULL; - List *stripeMetadataList = tableFooter->stripeMetadataList; + List *stripeMetadataList = tableMetadata->stripeMetadataList; uint32 stripeCount = list_length(stripeMetadataList); StripeFooter *stripeFooter = NULL; @@ -284,8 +207,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu } } - blockIndex = readState->stripeReadRowCount / tableFooter->blockRowCount; - blockRowIndex = readState->stripeReadRowCount % tableFooter->blockRowCount; + blockIndex = readState->stripeReadRowCount / tableMetadata->blockRowCount; + blockRowIndex = readState->stripeReadRowCount % tableMetadata->blockRowCount; if (blockIndex != readState->deserializedBlockIndex) { @@ -294,14 +217,14 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu uint32 stripeRowCount = 0; stripeRowCount = readState->stripeBuffers->rowCount; - lastBlockIndex = stripeRowCount / tableFooter->blockRowCount; + lastBlockIndex = stripeRowCount / tableMetadata->blockRowCount; if (blockIndex == lastBlockIndex) { - blockRowCount = stripeRowCount % tableFooter->blockRowCount; + blockRowCount = stripeRowCount % tableMetadata->blockRowCount; } else { - blockRowCount = tableFooter->blockRowCount; + blockRowCount = tableMetadata->blockRowCount; } oldContext = MemoryContextSwitchTo(readState->stripeReadContext); @@ -341,9 +264,9 @@ CStoreEndRead(TableReadState *readState) MemoryContextDelete(readState->stripeReadContext); FreeFile(readState->tableFile); - list_free_deep(readState->tableFooter->stripeMetadataList); + list_free_deep(readState->tableMetadata->stripeMetadataList); FreeColumnBlockDataArray(readState->blockDataArray, columnCount); - pfree(readState->tableFooter); + pfree(readState->tableMetadata); pfree(readState); } @@ -405,19 +328,12 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) uint64 CStoreTableRowCount(Oid relid, const char *filename) { - TableFooter *tableFooter = NULL; + TableMetadata *tableMetadata = NULL; FILE *tableFile; ListCell *stripeMetadataCell = NULL; uint64 totalRowCount = 0; - StringInfo tableFooterFilename = makeStringInfo(); - - appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); - - tableFooter = CStoreReadFooter(tableFooterFilename); - - pfree(tableFooterFilename->data); - pfree(tableFooterFilename); + tableMetadata = ReadTableMetadata(relid); tableFile = AllocateFile(filename, PG_BINARY_R); if (tableFile == NULL) @@ -426,7 +342,7 @@ CStoreTableRowCount(Oid relid, const char *filename) errmsg("could not open file \"%s\" for reading: %m", filename))); } - foreach(stripeMetadataCell, tableFooter->stripeMetadataList) + foreach(stripeMetadataCell, tableMetadata->stripeMetadataList) { StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata); @@ -1263,32 +1179,6 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor } -/* Returns the size of the given file handle. */ -static int64 -FILESize(FILE *file) -{ - int64 fileSize = 0; - int fseekResult = 0; - - errno = 0; - fseekResult = fseeko(file, 0, SEEK_END); - if (fseekResult != 0) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not seek in file: %m"))); - } - - fileSize = ftello(file); - if (fileSize == -1) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not get position in file: %m"))); - } - - return fileSize; -} - - /* Reads the given segment from the given file. */ static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size) diff --git a/cstore_writer.c b/cstore_writer.c index 318d8d518..240c13fc2 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -27,7 +27,6 @@ #include "cstore_metadata_serialization.h" #include "cstore_version_compat.h" -static void CStoreWriteFooter(StringInfo footerFileName, TableFooter *tableFooter); static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 blockRowCount, uint32 columnCount); @@ -50,7 +49,7 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode, int columnTypeLength, Oid columnCollation, FmgrInfo *comparisonFunction); static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); -static void AppendStripeMetadata(TableFooter *tableFooter, +static void AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata); static void WriteToFile(FILE *file, void *data, uint32 dataLength); static void SyncAndCloseFile(FILE *file); @@ -72,61 +71,37 @@ CStoreBeginWrite(Oid relationId, { TableWriteState *writeState = NULL; FILE *tableFile = NULL; - StringInfo tableFooterFilename = NULL; - TableFooter *tableFooter = NULL; + TableMetadata *tableMetadata = NULL; FmgrInfo **comparisonFunctionArray = NULL; MemoryContext stripeWriteContext = NULL; uint64 currentFileOffset = 0; uint32 columnCount = 0; uint32 columnIndex = 0; - struct stat statBuffer; - int statResult = 0; bool *columnMaskArray = NULL; ColumnBlockData **blockData = NULL; uint64 currentStripeId = 0; - tableFooterFilename = makeStringInfo(); - appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); - - statResult = stat(tableFooterFilename->data, &statBuffer); - if (statResult < 0) + tableFile = AllocateFile(filename, "a+"); + if (tableFile == NULL) { - tableFile = AllocateFile(filename, "w"); - if (tableFile == NULL) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - filename))); - } - - tableFooter = palloc0(sizeof(TableFooter)); - tableFooter->blockRowCount = blockRowCount; - tableFooter->stripeMetadataList = NIL; + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + filename))); } - else - { - tableFile = AllocateFile(filename, "r+"); - if (tableFile == NULL) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - filename))); - } - tableFooter = CStoreReadFooter(tableFooterFilename); - } + tableMetadata = ReadTableMetadata(relationId); /* * If stripeMetadataList is not empty, jump to the position right after * the last position. */ - if (tableFooter->stripeMetadataList != NIL) + if (tableMetadata->stripeMetadataList != NIL) { StripeMetadata *lastStripe = NULL; uint64 lastStripeSize = 0; int fseekResult = 0; - lastStripe = llast(tableFooter->stripeMetadataList); + lastStripe = llast(tableMetadata->stripeMetadataList); lastStripeSize += lastStripe->skipListLength; lastStripeSize += lastStripe->dataLength; lastStripeSize += lastStripe->footerLength; @@ -180,8 +155,7 @@ CStoreBeginWrite(Oid relationId, writeState = palloc0(sizeof(TableWriteState)); writeState->relationId = relationId; writeState->tableFile = tableFile; - writeState->tableFooterFilename = tableFooterFilename; - writeState->tableFooter = tableFooter; + writeState->tableMetadata = tableMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->tupleDescriptor = tupleDescriptor; @@ -215,8 +189,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; - TableFooter *tableFooter = writeState->tableFooter; - const uint32 blockRowCount = tableFooter->blockRowCount; + TableMetadata *tableMetadata = writeState->tableMetadata; + const uint32 blockRowCount = tableMetadata->blockRowCount; ColumnBlockData **blockDataArray = writeState->blockDataArray; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); @@ -304,7 +278,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul * doesn't free it. */ MemoryContextSwitchTo(oldContext); - AppendStripeMetadata(tableFooter, stripeMetadata); + InsertStripeMetadataRow(writeState->relationId, &stripeMetadata); + AppendStripeMetadata(tableMetadata, stripeMetadata); } else { @@ -322,9 +297,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul void CStoreEndWrite(TableWriteState *writeState) { - StringInfo tableFooterFilename = NULL; - StringInfo tempTableFooterFileName = NULL; - int renameResult = 0; int columnCount = writeState->tupleDescriptor->natts; StripeBuffers *stripeBuffers = writeState->stripeBuffers; @@ -336,85 +308,20 @@ CStoreEndWrite(TableWriteState *writeState) MemoryContextReset(writeState->stripeWriteContext); MemoryContextSwitchTo(oldContext); - AppendStripeMetadata(writeState->tableFooter, stripeMetadata); + InsertStripeMetadataRow(writeState->relationId, &stripeMetadata); + AppendStripeMetadata(writeState->tableMetadata, stripeMetadata); } SyncAndCloseFile(writeState->tableFile); - tableFooterFilename = writeState->tableFooterFilename; - tempTableFooterFileName = makeStringInfo(); - appendStringInfo(tempTableFooterFileName, "%s%s", tableFooterFilename->data, - CSTORE_TEMP_FILE_SUFFIX); - - CStoreWriteFooter(tempTableFooterFileName, writeState->tableFooter); - - renameResult = rename(tempTableFooterFileName->data, tableFooterFilename->data); - if (renameResult != 0) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not rename file \"%s\" to \"%s\": %m", - tempTableFooterFileName->data, - tableFooterFilename->data))); - } - - pfree(tempTableFooterFileName->data); - pfree(tempTableFooterFileName); - MemoryContextDelete(writeState->stripeWriteContext); - list_free_deep(writeState->tableFooter->stripeMetadataList); - pfree(writeState->tableFooter); - pfree(writeState->tableFooterFilename->data); - pfree(writeState->tableFooterFilename); + list_free_deep(writeState->tableMetadata->stripeMetadataList); pfree(writeState->comparisonFunctionArray); FreeColumnBlockDataArray(writeState->blockDataArray, columnCount); pfree(writeState); } -/* - * CStoreWriteFooter writes the given footer to given file. First, the function - * serializes and writes the footer to the file. Then, the function serializes - * and writes the postscript. Then, the function writes the postscript size as - * the last byte of the file. Last, the function syncs and closes the footer file. - */ -static void -CStoreWriteFooter(StringInfo tableFooterFilename, TableFooter *tableFooter) -{ - FILE *tableFooterFile = NULL; - StringInfo tableFooterBuffer = NULL; - StringInfo postscriptBuffer = NULL; - uint8 postscriptSize = 0; - - tableFooterFile = AllocateFile(tableFooterFilename->data, PG_BINARY_W); - if (tableFooterFile == NULL) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - tableFooterFilename->data))); - } - - /* write the footer */ - tableFooterBuffer = SerializeTableFooter(tableFooter); - WriteToFile(tableFooterFile, tableFooterBuffer->data, tableFooterBuffer->len); - - /* write the postscript */ - postscriptBuffer = SerializePostScript(tableFooterBuffer->len); - WriteToFile(tableFooterFile, postscriptBuffer->data, postscriptBuffer->len); - - /* write the 1-byte postscript size */ - Assert(postscriptBuffer->len < CSTORE_POSTSCRIPT_SIZE_MAX); - postscriptSize = postscriptBuffer->len; - WriteToFile(tableFooterFile, &postscriptSize, CSTORE_POSTSCRIPT_SIZE_LENGTH); - - SyncAndCloseFile(tableFooterFile); - - pfree(tableFooterBuffer->data); - pfree(tableFooterBuffer); - pfree(postscriptBuffer->data); - pfree(postscriptBuffer); -} - - /* * CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given * column count. @@ -501,7 +408,7 @@ FlushStripe(TableWriteState *writeState) StripeFooter *stripeFooter = NULL; uint32 columnIndex = 0; uint32 blockIndex = 0; - TableFooter *tableFooter = writeState->tableFooter; + TableMetadata *tableMetadata = writeState->tableMetadata; FILE *tableFile = writeState->tableFile; StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; @@ -509,7 +416,7 @@ FlushStripe(TableWriteState *writeState) TupleDesc tupleDescriptor = writeState->tupleDescriptor; uint32 columnCount = tupleDescriptor->natts; uint32 blockCount = stripeSkipList->blockCount; - uint32 blockRowCount = tableFooter->blockRowCount; + uint32 blockRowCount = tableMetadata->blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; @@ -918,13 +825,13 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength) * table footer's stripeMetadataList. */ static void -AppendStripeMetadata(TableFooter *tableFooter, StripeMetadata stripeMetadata) +AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata) { StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata)); memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata)); - tableFooter->stripeMetadataList = lappend(tableFooter->stripeMetadataList, - stripeMetadataCopy); + tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList, + stripeMetadataCopy); } diff --git a/expected/truncate.out b/expected/truncate.out index e16a6ea9f..14119c804 100644 --- a/expected/truncate.out +++ b/expected/truncate.out @@ -72,7 +72,7 @@ SELECT count(*) FROM cstore_truncate_test_compressed; SELECT cstore_table_size('cstore_truncate_test_compressed'); cstore_table_size ------------------- - 26 + 0 (1 row) -- make sure data files still present @@ -82,7 +82,7 @@ SELECT count(*) FROM ( ) AS q1) AS q2; count ------- - 6 + 3 (1 row) INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a; From 10fd94a9e3090fef1628fcc10e7fa32cd909edef Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 8 Sep 2020 19:03:01 -0700 Subject: [PATCH 2/4] Address feedback --- cstore_fdw--1.7.sql | 34 ++++++++--------- cstore_metadata_tables.c | 79 ++++++++++++++++++++++++++++++---------- 2 files changed, 76 insertions(+), 37 deletions(-) diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index 86589ca90..726085b17 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -59,36 +59,36 @@ CREATE EVENT TRIGGER cstore_drop_event EXECUTE PROCEDURE cstore_drop_trigger(); CREATE TABLE cstore_tables ( - relid oid, - block_row_count int, - version_major bigint, - version_minor bigint, + relid oid NOT NULL, + block_row_count int NOT NULL, + version_major bigint NOT NULL, + version_minor bigint NOT NULL, PRIMARY KEY (relid) ) WITH (user_catalog_table = true); ALTER TABLE cstore_tables SET SCHEMA pg_catalog; CREATE TABLE cstore_stripes ( - relid oid, - stripe bigint, - file_offset bigint, - skiplist_length bigint, - data_length bigint, + relid oid NOT NULL, + stripe bigint NOT NULL, + file_offset bigint NOT NULL, + skiplist_length bigint NOT NULL, + data_length bigint NOT NULL, PRIMARY KEY (relid, stripe), - FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE + FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); ALTER TABLE cstore_stripes SET SCHEMA pg_catalog; CREATE TABLE cstore_stripe_attr ( - relid oid, - stripe bigint, - attr int, - exists_size bigint, - value_size bigint, - skiplist_size bigint, + relid oid NOT NULL, + stripe bigint NOT NULL, + attr int NOT NULL, + exists_size bigint NOT NULL, + value_size bigint NOT NULL, + skiplist_size bigint NOT NULL, PRIMARY KEY (relid, stripe, attr), - FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE + FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog; diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index f5168de1e..5c381a029 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -50,6 +50,8 @@ static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, uint64 skiplistSize); static int TableBlockRowCount(Oid relid); static void DeleteTableMetadataRowIfExists(Oid relid); +static void InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); +static void DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); static EState * create_estate_for_relation(Relation rel); /* constants for cstore_stripe_attr */ @@ -103,7 +105,7 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount) tuple = heap_form_tuple(tupleDescriptor, values, nulls); - CatalogTupleInsert(cstoreTable, tuple); + InsertTupleAndEnforceConstraints(cstoreTable, tuple); CommandCounterIncrement(); @@ -132,7 +134,7 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); - CatalogTupleInsert(cstoreStripes, tuple); + InsertTupleAndEnforceConstraints(cstoreStripes, tuple); CommandCounterIncrement(); @@ -264,23 +266,7 @@ DeleteTableMetadataRowIfExists(Oid relid) heapTuple = systable_getnext(scanDescriptor); if (HeapTupleIsValid(heapTuple)) { - EState *estate = create_estate_for_relation(cstoreTables); - ResultRelInfo *resultRelInfo = estate->es_result_relation_info; - - ItemPointer tid = &(heapTuple->t_self); - simple_table_tuple_delete(cstoreTables, tid, estate->es_snapshot); - - /* - * Execute AFTER ROW DELETE Triggers to enforce foreign key - * constraints. - */ - ExecARDeleteTriggers(estate, resultRelInfo, - tid, NULL, NULL); - - AfterTriggerEndQuery(estate); - ExecCleanUpTriggerState(estate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + DeleteTupleAndEnforceConstraints(cstoreTables, heapTuple); } systable_endscan_ordered(scanDescriptor); @@ -289,6 +275,59 @@ DeleteTableMetadataRowIfExists(Oid relid) } +/* + * InsertTupleAndEnforceConstraints inserts a tuple into a relation and + * makes sure constraints (e.g. FK constraints, NOT NULL, ...) are enforced. + */ +static void +InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) +{ + EState *estate = NULL; + TupleTableSlot *slot = NULL; + + estate = create_estate_for_relation(rel); + slot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel), &TTSOpsHeapTuple); + ExecStoreHeapTuple(heapTuple, slot, false); + + ExecOpenIndices(estate->es_result_relation_info, false); + + /* ExecSimpleRelationInsert executes any constraints */ + ExecSimpleRelationInsert(estate, slot); + + ExecCloseIndices(estate->es_result_relation_info); + + AfterTriggerEndQuery(estate); + ExecCleanUpTriggerState(estate); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); +} + + + +/* + * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and + * makes sure constraints (e.g. FK constraints) are enforced. + */ +static void +DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) +{ + EState *estate = create_estate_for_relation(rel); + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + ItemPointer tid = &(heapTuple->t_self); + simple_table_tuple_delete(rel, tid, estate->es_snapshot); + + /* execute AFTER ROW DELETE Triggers to enforce constraints */ + ExecARDeleteTriggers(estate, resultRelInfo, + tid, NULL, NULL); + + AfterTriggerEndQuery(estate); + ExecCleanUpTriggerState(estate); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); +} + + /* * Based on a similar function from * postgres/src/backend/replication/logical/worker.c. @@ -370,7 +409,7 @@ InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); - CatalogTupleInsert(cstoreStripeAttrs, tuple); + InsertTupleAndEnforceConstraints(cstoreStripeAttrs, tuple); CommandCounterIncrement(); From 35a52a6fe16e2fa761b1df43c096b8af333731ac Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 9 Sep 2020 11:04:27 -0700 Subject: [PATCH 3/4] Use cstore namespace instead of pg_catalog. --- cstore_fdw--1.7.sql | 18 +++++++----------- cstore_metadata_tables.c | 21 +++++++++++++++------ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cstore_fdw--1.7.sql b/cstore_fdw--1.7.sql index 726085b17..7a0c9c7b8 100644 --- a/cstore_fdw--1.7.sql +++ b/cstore_fdw--1.7.sql @@ -3,6 +3,8 @@ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION cstore_fdw" to load this file. \quit +CREATE SCHEMA cstore; + CREATE FUNCTION cstore_fdw_handler() RETURNS fdw_handler AS 'MODULE_PATHNAME' @@ -58,7 +60,7 @@ CREATE EVENT TRIGGER cstore_drop_event ON SQL_DROP EXECUTE PROCEDURE cstore_drop_trigger(); -CREATE TABLE cstore_tables ( +CREATE TABLE cstore.cstore_tables ( relid oid NOT NULL, block_row_count int NOT NULL, version_major bigint NOT NULL, @@ -66,21 +68,17 @@ CREATE TABLE cstore_tables ( PRIMARY KEY (relid) ) WITH (user_catalog_table = true); -ALTER TABLE cstore_tables SET SCHEMA pg_catalog; - -CREATE TABLE cstore_stripes ( +CREATE TABLE cstore.cstore_stripes ( relid oid NOT NULL, stripe bigint NOT NULL, file_offset bigint NOT NULL, skiplist_length bigint NOT NULL, data_length bigint NOT NULL, PRIMARY KEY (relid, stripe), - FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED + FOREIGN KEY (relid) REFERENCES cstore.cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); -ALTER TABLE cstore_stripes SET SCHEMA pg_catalog; - -CREATE TABLE cstore_stripe_attr ( +CREATE TABLE cstore.cstore_stripe_attr ( relid oid NOT NULL, stripe bigint NOT NULL, attr int NOT NULL, @@ -88,7 +86,5 @@ CREATE TABLE cstore_stripe_attr ( value_size bigint NOT NULL, skiplist_size bigint NOT NULL, PRIMARY KEY (relid, stripe, attr), - FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED + FOREIGN KEY (relid, stripe) REFERENCES cstore.cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED ) WITH (user_catalog_table = true); - -ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog; diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 5c381a029..39e852c55 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -22,6 +22,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_collation.h" #include "catalog/pg_type.h" +#include "catalog/namespace.h" #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -45,6 +46,7 @@ static Oid CStoreStripesRelationId(void); static Oid CStoreStripesIndexRelationId(void); static Oid CStoreTablesRelationId(void); static Oid CStoreTablesIndexRelationId(void); +static Oid CStoreNamespaceId(void); static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, uint64 existsSize, uint64 valuesSize, uint64 skiplistSize); @@ -494,7 +496,7 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) static Oid CStoreStripeAttrRelationId(void) { - return get_relname_relid("cstore_stripe_attr", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_stripe_attr", CStoreNamespaceId()); } @@ -505,7 +507,7 @@ CStoreStripeAttrRelationId(void) static Oid CStoreStripeAttrIndexRelationId(void) { - return get_relname_relid("cstore_stripe_attr_pkey", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_stripe_attr_pkey", CStoreNamespaceId()); } @@ -516,7 +518,7 @@ CStoreStripeAttrIndexRelationId(void) static Oid CStoreStripesRelationId(void) { - return get_relname_relid("cstore_stripes", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_stripes", CStoreNamespaceId()); } @@ -527,7 +529,7 @@ CStoreStripesRelationId(void) static Oid CStoreStripesIndexRelationId(void) { - return get_relname_relid("cstore_stripes_pkey", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_stripes_pkey", CStoreNamespaceId()); } @@ -538,7 +540,7 @@ CStoreStripesIndexRelationId(void) static Oid CStoreTablesRelationId(void) { - return get_relname_relid("cstore_tables", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_tables", CStoreNamespaceId()); } @@ -549,5 +551,12 @@ CStoreTablesRelationId(void) static Oid CStoreTablesIndexRelationId(void) { - return get_relname_relid("cstore_tables_pkey", PG_CATALOG_NAMESPACE); + return get_relname_relid("cstore_tables_pkey", CStoreNamespaceId()); +} + + +static Oid +CStoreNamespaceId(void) +{ + return get_namespace_oid("cstore", false); } From 0d4e249c97ba7f65a6dcacb4fc8527624b877d4b Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 9 Sep 2020 14:17:30 -0700 Subject: [PATCH 4/4] Reuse the same state for multiple inserts --- cstore_metadata_tables.c | 304 ++++++++++++++++++++------------------- 1 file changed, 157 insertions(+), 147 deletions(-) diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index 39e852c55..3843e4cd6 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -40,6 +40,12 @@ #include "cstore_metadata_serialization.h" +typedef struct +{ + Relation rel; + EState *estate; +} ModifyState; + static Oid CStoreStripeAttrRelationId(void); static Oid CStoreStripeAttrIndexRelationId(void); static Oid CStoreStripesRelationId(void); @@ -47,13 +53,13 @@ static Oid CStoreStripesIndexRelationId(void); static Oid CStoreTablesRelationId(void); static Oid CStoreTablesIndexRelationId(void); static Oid CStoreNamespaceId(void); -static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, - uint64 existsSize, uint64 valuesSize, - uint64 skiplistSize); static int TableBlockRowCount(Oid relid); static void DeleteTableMetadataRowIfExists(Oid relid); -static void InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); -static void DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple); +static ModifyState * StartModifyRelation(Relation rel); +static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, + bool *nulls); +static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple); +static void FinishModifyRelation(ModifyState *state); static EState * create_estate_for_relation(Relation rel); /* constants for cstore_stripe_attr */ @@ -86,10 +92,9 @@ static EState * create_estate_for_relation(Relation rel); void InitCStoreTableMetadata(Oid relid, int blockRowCount) { - Oid cstoreTableOid = InvalidOid; - Relation cstoreTable = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple tuple = NULL; + Oid cstoreTablesOid = InvalidOid; + Relation cstoreTables = NULL; + ModifyState *modifyState = NULL; bool nulls[Natts_cstore_tables] = { 0 }; Datum values[Natts_cstore_tables] = { @@ -101,17 +106,16 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount) DeleteTableMetadataRowIfExists(relid); - cstoreTableOid = CStoreTablesRelationId(); - cstoreTable = heap_open(cstoreTableOid, RowExclusiveLock); - tupleDescriptor = RelationGetDescr(cstoreTable); + cstoreTablesOid = CStoreTablesRelationId(); + cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock); - tuple = heap_form_tuple(tupleDescriptor, values, nulls); - - InsertTupleAndEnforceConstraints(cstoreTable, tuple); + modifyState = StartModifyRelation(cstoreTables); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + FinishModifyRelation(modifyState); CommandCounterIncrement(); - heap_close(cstoreTable, NoLock); + heap_close(cstoreTables, NoLock); } @@ -132,11 +136,10 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe) Oid cstoreStripesOid = CStoreStripesRelationId(); Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripes); - HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); - - InsertTupleAndEnforceConstraints(cstoreStripes, tuple); + ModifyState *modifyState = StartModifyRelation(cstoreStripes); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + FinishModifyRelation(modifyState); CommandCounterIncrement(); @@ -268,7 +271,9 @@ DeleteTableMetadataRowIfExists(Oid relid) heapTuple = systable_getnext(scanDescriptor); if (HeapTupleIsValid(heapTuple)) { - DeleteTupleAndEnforceConstraints(cstoreTables, heapTuple); + ModifyState *modifyState = StartModifyRelation(cstoreTables); + DeleteTupleAndEnforceConstraints(modifyState, heapTuple); + FinishModifyRelation(modifyState); } systable_endscan_ordered(scanDescriptor); @@ -277,144 +282,33 @@ DeleteTableMetadataRowIfExists(Oid relid) } -/* - * InsertTupleAndEnforceConstraints inserts a tuple into a relation and - * makes sure constraints (e.g. FK constraints, NOT NULL, ...) are enforced. - */ -static void -InsertTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) -{ - EState *estate = NULL; - TupleTableSlot *slot = NULL; - - estate = create_estate_for_relation(rel); - slot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel), &TTSOpsHeapTuple); - ExecStoreHeapTuple(heapTuple, slot, false); - - ExecOpenIndices(estate->es_result_relation_info, false); - - /* ExecSimpleRelationInsert executes any constraints */ - ExecSimpleRelationInsert(estate, slot); - - ExecCloseIndices(estate->es_result_relation_info); - - AfterTriggerEndQuery(estate); - ExecCleanUpTriggerState(estate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); -} - - - -/* - * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and - * makes sure constraints (e.g. FK constraints) are enforced. - */ -static void -DeleteTupleAndEnforceConstraints(Relation rel, HeapTuple heapTuple) -{ - EState *estate = create_estate_for_relation(rel); - ResultRelInfo *resultRelInfo = estate->es_result_relation_info; - - ItemPointer tid = &(heapTuple->t_self); - simple_table_tuple_delete(rel, tid, estate->es_snapshot); - - /* execute AFTER ROW DELETE Triggers to enforce constraints */ - ExecARDeleteTriggers(estate, resultRelInfo, - tid, NULL, NULL); - - AfterTriggerEndQuery(estate); - ExecCleanUpTriggerState(estate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); -} - - -/* - * Based on a similar function from - * postgres/src/backend/replication/logical/worker.c. - * - * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. - * - * This is based on similar code in copy.c - */ -static EState * -create_estate_for_relation(Relation rel) -{ - EState *estate; - ResultRelInfo *resultRelInfo; - RangeTblEntry *rte; - - estate = CreateExecutorState(); - - rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(rel); - rte->relkind = rel->rd_rel->relkind; - rte->rellockmode = AccessShareLock; - ExecInitRangeTable(estate, list_make1(rte)); - - resultRelInfo = makeNode(ResultRelInfo); - InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); - - estate->es_result_relations = resultRelInfo; - estate->es_num_result_relations = 1; - estate->es_result_relation_info = resultRelInfo; - - estate->es_output_cid = GetCurrentCommandId(true); - - /* Prepare to catch AFTER triggers. */ - AfterTriggerBeginQuery(); - - return estate; -} - - /* * SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records. */ void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer) { - for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) - { - InsertStripeAttrRow(relid, stripe, attr, - footer->existsSizeArray[attr - 1], - footer->valueSizeArray[attr - 1], - footer->skipListSizeArray[attr - 1]); - } -} - - -/* - * InsertStripeAttrRow adds a row to cstore_stripe_attr. - */ -static void -InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr, - uint64 existsSize, uint64 valuesSize, - uint64 skiplistSize) -{ - bool nulls[Natts_cstore_stripe_attr] = { 0 }; - Datum values[Natts_cstore_stripe_attr] = { - ObjectIdGetDatum(relid), - Int64GetDatum(stripe), - Int16GetDatum(attr), - Int64GetDatum(existsSize), - Int64GetDatum(valuesSize), - Int64GetDatum(skiplistSize) - }; - Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId(); Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs); - HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs); - InsertTupleAndEnforceConstraints(cstoreStripeAttrs, tuple); + for (AttrNumber attr = 1; attr <= footer->columnCount; attr++) + { + bool nulls[Natts_cstore_stripe_attr] = { 0 }; + Datum values[Natts_cstore_stripe_attr] = { + ObjectIdGetDatum(relid), + Int64GetDatum(stripe), + Int16GetDatum(attr), + Int64GetDatum(footer->existsSizeArray[attr - 1]), + Int64GetDatum(footer->valueSizeArray[attr - 1]), + Int64GetDatum(footer->skipListSizeArray[attr - 1]) + }; - CommandCounterIncrement(); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + } + FinishModifyRelation(modifyState); heap_close(cstoreStripeAttrs, NoLock); } @@ -489,6 +383,118 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount) } +/* + * StartModifyRelation allocates resources for modifications. + */ +static ModifyState * +StartModifyRelation(Relation rel) +{ + ModifyState *modifyState = NULL; + EState *estate = create_estate_for_relation(rel); + + /* ExecSimpleRelationInsert, ... require caller to open indexes */ + ExecOpenIndices(estate->es_result_relation_info, false); + + modifyState = palloc(sizeof(ModifyState)); + modifyState->rel = rel; + modifyState->estate = estate; + + return modifyState; +} + + +/* + * InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes + * sure constraints are enforced and indexes are updated. + */ +static void +InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls) +{ + TupleDesc tupleDescriptor = RelationGetDescr(state->rel); + HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls); + TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor, + &TTSOpsHeapTuple); + ExecStoreHeapTuple(tuple, slot, false); + + /* use ExecSimpleRelationInsert to enforce constraints */ + ExecSimpleRelationInsert(state->estate, slot); +} + + +/* + * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and + * makes sure constraints (e.g. FK constraints) are enforced. + */ +static void +DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple) +{ + EState *estate = state->estate; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + ItemPointer tid = &(heapTuple->t_self); + simple_table_tuple_delete(state->rel, tid, estate->es_snapshot); + + /* execute AFTER ROW DELETE Triggers to enforce constraints */ + ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL); +} + + +/* + * FinishModifyRelation cleans up resources after modifications are done. + */ +static void +FinishModifyRelation(ModifyState *state) +{ + ExecCloseIndices(state->estate->es_result_relation_info); + + AfterTriggerEndQuery(state->estate); + ExecCleanUpTriggerState(state->estate); + ExecResetTupleTable(state->estate->es_tupleTable, false); + FreeExecutorState(state->estate); +} + + +/* + * Based on a similar function from + * postgres/src/backend/replication/logical/worker.c. + * + * Executor state preparation for evaluation of constraint expressions, + * indexes and triggers. + * + * This is based on similar code in copy.c + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + ResultRelInfo *resultRelInfo; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + return estate; +} + + /* * CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr. * TODO: should we cache this similar to citus? @@ -555,6 +561,10 @@ CStoreTablesIndexRelationId(void) } +/* + * CStoreNamespaceId returns namespace id of the schema we store cstore + * related tables. + */ static Oid CStoreNamespaceId(void) {