mirror of https://github.com/citusdata/citus.git
Merge pull request #2 from citusdata/table_footer_to_metadata_tables
Move table footer to metadata tablesmerge-cstore-pykello
commit
407892a9dd
15
cstore.c
15
cstore.c
|
@ -130,6 +130,8 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto
|
||||||
TableWriteState *writeState = NULL;
|
TableWriteState *writeState = NULL;
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||||
|
|
||||||
|
InitCStoreTableMetadata(relationId, cstoreOptions->blockRowCount);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize state to write to the cstore file. This creates an
|
* Initialize state to write to the cstore file. This creates an
|
||||||
* empty data file and a valid footer file for the table.
|
* empty data file and a valid footer file for the table.
|
||||||
|
@ -183,19 +185,6 @@ void
|
||||||
DeleteCStoreTableFiles(char *filename)
|
DeleteCStoreTableFiles(char *filename)
|
||||||
{
|
{
|
||||||
int dataFileRemoved = 0;
|
int dataFileRemoved = 0;
|
||||||
int footerFileRemoved = 0;
|
|
||||||
|
|
||||||
StringInfo tableFooterFilename = makeStringInfo();
|
|
||||||
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
|
|
||||||
|
|
||||||
/* delete the footer file */
|
|
||||||
footerFileRemoved = unlink(tableFooterFilename->data);
|
|
||||||
if (footerFileRemoved != 0)
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errcode_for_file_access(),
|
|
||||||
errmsg("could not delete file \"%s\": %m",
|
|
||||||
tableFooterFilename->data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* delete the data file */
|
/* delete the data file */
|
||||||
dataFileRemoved = unlink(filename);
|
dataFileRemoved = unlink(filename);
|
||||||
|
|
18
cstore.h
18
cstore.h
|
@ -46,8 +46,6 @@
|
||||||
|
|
||||||
/* miscellaneous defines */
|
/* miscellaneous defines */
|
||||||
#define CSTORE_FDW_NAME "cstore_fdw"
|
#define CSTORE_FDW_NAME "cstore_fdw"
|
||||||
#define CSTORE_FOOTER_FILE_SUFFIX ".footer"
|
|
||||||
#define CSTORE_TEMP_FILE_SUFFIX ".tmp"
|
|
||||||
#define CSTORE_TUPLE_COST_MULTIPLIER 10
|
#define CSTORE_TUPLE_COST_MULTIPLIER 10
|
||||||
#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1
|
#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1
|
||||||
#define CSTORE_POSTSCRIPT_SIZE_MAX 256
|
#define CSTORE_POSTSCRIPT_SIZE_MAX 256
|
||||||
|
@ -91,12 +89,12 @@ typedef struct StripeMetadata
|
||||||
} StripeMetadata;
|
} StripeMetadata;
|
||||||
|
|
||||||
|
|
||||||
/* TableFooter represents the footer of a cstore file. */
|
/* TableMetadata represents the metadata of a cstore file. */
|
||||||
typedef struct TableFooter
|
typedef struct TableMetadata
|
||||||
{
|
{
|
||||||
List *stripeMetadataList;
|
List *stripeMetadataList;
|
||||||
uint64 blockRowCount;
|
uint64 blockRowCount;
|
||||||
} TableFooter;
|
} TableMetadata;
|
||||||
|
|
||||||
|
|
||||||
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
|
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
|
||||||
|
@ -206,7 +204,7 @@ typedef struct TableReadState
|
||||||
Oid relationId;
|
Oid relationId;
|
||||||
|
|
||||||
FILE *tableFile;
|
FILE *tableFile;
|
||||||
TableFooter *tableFooter;
|
TableMetadata *tableMetadata;
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -231,8 +229,7 @@ typedef struct TableWriteState
|
||||||
{
|
{
|
||||||
Oid relationId;
|
Oid relationId;
|
||||||
FILE *tableFile;
|
FILE *tableFile;
|
||||||
TableFooter *tableFooter;
|
TableMetadata *tableMetadata;
|
||||||
StringInfo tableFooterFilename;
|
|
||||||
CompressionType compressionType;
|
CompressionType compressionType;
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
FmgrInfo **comparisonFunctionArray;
|
FmgrInfo **comparisonFunctionArray;
|
||||||
|
@ -277,7 +274,6 @@ extern void CStoreEndWrite(TableWriteState *state);
|
||||||
extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename,
|
extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *qualConditions);
|
List *projectedColumnList, List *qualConditions);
|
||||||
extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename);
|
|
||||||
extern bool CStoreReadFinished(TableReadState *state);
|
extern bool CStoreReadFinished(TableReadState *state);
|
||||||
extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
|
extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
|
@ -298,6 +294,8 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio
|
||||||
/* cstore_metadata_tables.c */
|
/* cstore_metadata_tables.c */
|
||||||
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
|
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
|
||||||
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
|
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
|
||||||
|
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
||||||
|
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
||||||
|
extern TableMetadata * ReadTableMetadata(Oid relid);
|
||||||
|
|
||||||
#endif /* CSTORE_H */
|
#endif /* CSTORE_H */
|
||||||
|
|
22
cstore.proto
22
cstore.proto
|
@ -22,25 +22,3 @@ message ColumnBlockSkipNode {
|
||||||
message ColumnBlockSkipList {
|
message ColumnBlockSkipList {
|
||||||
repeated ColumnBlockSkipNode blockSkipNodeArray = 1;
|
repeated ColumnBlockSkipNode blockSkipNodeArray = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message StripeMetadata {
|
|
||||||
optional uint64 fileOffset = 1;
|
|
||||||
optional uint64 skipListLength = 2;
|
|
||||||
optional uint64 dataLength = 3;
|
|
||||||
optional uint64 footerLength = 4;
|
|
||||||
optional uint64 id = 5;
|
|
||||||
}
|
|
||||||
|
|
||||||
message TableFooter {
|
|
||||||
repeated StripeMetadata stripeMetadataArray = 1;
|
|
||||||
optional uint32 blockRowCount = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message PostScript {
|
|
||||||
optional uint64 tableFooterLength = 1;
|
|
||||||
optional uint64 versionMajor = 2;
|
|
||||||
optional uint64 versionMinor = 3;
|
|
||||||
|
|
||||||
// Leave this last in the record
|
|
||||||
optional string magicNumber = 8000;
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||||
\echo Use "CREATE EXTENSION cstore_fdw" to load this file. \quit
|
\echo Use "CREATE EXTENSION cstore_fdw" to load this file. \quit
|
||||||
|
|
||||||
|
CREATE SCHEMA cstore;
|
||||||
|
|
||||||
CREATE FUNCTION cstore_fdw_handler()
|
CREATE FUNCTION cstore_fdw_handler()
|
||||||
RETURNS fdw_handler
|
RETURNS fdw_handler
|
||||||
AS 'MODULE_PATHNAME'
|
AS 'MODULE_PATHNAME'
|
||||||
|
@ -58,17 +60,31 @@ CREATE EVENT TRIGGER cstore_drop_event
|
||||||
ON SQL_DROP
|
ON SQL_DROP
|
||||||
EXECUTE PROCEDURE cstore_drop_trigger();
|
EXECUTE PROCEDURE cstore_drop_trigger();
|
||||||
|
|
||||||
CREATE TABLE cstore_stripe_attr (
|
CREATE TABLE cstore.cstore_tables (
|
||||||
relid oid,
|
relid oid NOT NULL,
|
||||||
stripe bigint,
|
block_row_count int NOT NULL,
|
||||||
attr int,
|
version_major bigint NOT NULL,
|
||||||
exists_size bigint,
|
version_minor bigint NOT NULL,
|
||||||
value_size bigint,
|
PRIMARY KEY (relid)
|
||||||
skiplist_size bigint
|
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
CREATE INDEX cstore_stripe_attr_idx
|
CREATE TABLE cstore.cstore_stripes (
|
||||||
ON cstore_stripe_attr
|
relid oid NOT NULL,
|
||||||
USING BTREE(relid, stripe, attr);
|
stripe bigint NOT NULL,
|
||||||
|
file_offset bigint NOT NULL,
|
||||||
|
skiplist_length bigint NOT NULL,
|
||||||
|
data_length bigint NOT NULL,
|
||||||
|
PRIMARY KEY (relid, stripe),
|
||||||
|
FOREIGN KEY (relid) REFERENCES cstore.cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
ALTER TABLE cstore_stripe_attr SET SCHEMA pg_catalog;
|
CREATE TABLE cstore.cstore_stripe_attr (
|
||||||
|
relid oid NOT NULL,
|
||||||
|
stripe bigint NOT NULL,
|
||||||
|
attr int NOT NULL,
|
||||||
|
exists_size bigint NOT NULL,
|
||||||
|
value_size bigint NOT NULL,
|
||||||
|
skiplist_size bigint NOT NULL,
|
||||||
|
PRIMARY KEY (relid, stripe, attr),
|
||||||
|
FOREIGN KEY (relid, stripe) REFERENCES cstore.cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
|
) WITH (user_catalog_table = true);
|
||||||
|
|
17
cstore_fdw.c
17
cstore_fdw.c
|
@ -426,6 +426,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
||||||
RemoveCStoreDatabaseDirectory(databaseOid);
|
RemoveCStoreDatabaseDirectory(databaseOid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* handle other utility statements */
|
/* handle other utility statements */
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1026,11 +1027,8 @@ cstore_table_size(PG_FUNCTION_ARGS)
|
||||||
int64 tableSize = 0;
|
int64 tableSize = 0;
|
||||||
CStoreOptions *cstoreOptions = NULL;
|
CStoreOptions *cstoreOptions = NULL;
|
||||||
char *dataFilename = NULL;
|
char *dataFilename = NULL;
|
||||||
StringInfo footerFilename = NULL;
|
|
||||||
int dataFileStatResult = 0;
|
int dataFileStatResult = 0;
|
||||||
int footerFileStatResult = 0;
|
|
||||||
struct stat dataFileStatBuffer;
|
struct stat dataFileStatBuffer;
|
||||||
struct stat footerFileStatBuffer;
|
|
||||||
|
|
||||||
bool cstoreTable = CStoreTable(relationId);
|
bool cstoreTable = CStoreTable(relationId);
|
||||||
if (!cstoreTable)
|
if (!cstoreTable)
|
||||||
|
@ -1048,20 +1046,7 @@ cstore_table_size(PG_FUNCTION_ARGS)
|
||||||
errmsg("could not stat file \"%s\": %m", dataFilename)));
|
errmsg("could not stat file \"%s\": %m", dataFilename)));
|
||||||
}
|
}
|
||||||
|
|
||||||
footerFilename = makeStringInfo();
|
|
||||||
appendStringInfo(footerFilename, "%s%s", dataFilename,
|
|
||||||
CSTORE_FOOTER_FILE_SUFFIX);
|
|
||||||
|
|
||||||
footerFileStatResult = stat(footerFilename->data, &footerFileStatBuffer);
|
|
||||||
if (footerFileStatResult != 0)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not stat file \"%s\": %m",
|
|
||||||
footerFilename->data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
tableSize += dataFileStatBuffer.st_size;
|
tableSize += dataFileStatBuffer.st_size;
|
||||||
tableSize += footerFileStatBuffer.st_size;
|
|
||||||
|
|
||||||
PG_RETURN_INT64(tableSize);
|
PG_RETURN_INT64(tableSize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,98 +28,6 @@ static Datum ProtobufBinaryToDatum(ProtobufCBinaryData protobufBinary,
|
||||||
bool typeByValue, int typeLength);
|
bool typeByValue, int typeLength);
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SerializePostScript serializes the given postscript and returns the result as
|
|
||||||
* a StringInfo.
|
|
||||||
*/
|
|
||||||
StringInfo
|
|
||||||
SerializePostScript(uint64 tableFooterLength)
|
|
||||||
{
|
|
||||||
StringInfo postscriptBuffer = NULL;
|
|
||||||
Protobuf__PostScript protobufPostScript = PROTOBUF__POST_SCRIPT__INIT;
|
|
||||||
uint8 *postscriptData = NULL;
|
|
||||||
uint32 postscriptSize = 0;
|
|
||||||
|
|
||||||
protobufPostScript.has_tablefooterlength = true;
|
|
||||||
protobufPostScript.tablefooterlength = tableFooterLength;
|
|
||||||
protobufPostScript.has_versionmajor = true;
|
|
||||||
protobufPostScript.versionmajor = CSTORE_VERSION_MAJOR;
|
|
||||||
protobufPostScript.has_versionminor = true;
|
|
||||||
protobufPostScript.versionminor = CSTORE_VERSION_MINOR;
|
|
||||||
protobufPostScript.magicnumber = pstrdup(CSTORE_MAGIC_NUMBER);
|
|
||||||
|
|
||||||
postscriptSize = protobuf__post_script__get_packed_size(&protobufPostScript);
|
|
||||||
postscriptData = palloc0(postscriptSize);
|
|
||||||
protobuf__post_script__pack(&protobufPostScript, postscriptData);
|
|
||||||
|
|
||||||
postscriptBuffer = palloc0(sizeof(StringInfoData));
|
|
||||||
postscriptBuffer->len = postscriptSize;
|
|
||||||
postscriptBuffer->maxlen = postscriptSize;
|
|
||||||
postscriptBuffer->data = (char *) postscriptData;
|
|
||||||
|
|
||||||
return postscriptBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SerializeTableFooter serializes the given table footer and returns the result
|
|
||||||
* as a StringInfo.
|
|
||||||
*/
|
|
||||||
StringInfo
|
|
||||||
SerializeTableFooter(TableFooter *tableFooter)
|
|
||||||
{
|
|
||||||
StringInfo tableFooterBuffer = NULL;
|
|
||||||
Protobuf__TableFooter protobufTableFooter = PROTOBUF__TABLE_FOOTER__INIT;
|
|
||||||
Protobuf__StripeMetadata **stripeMetadataArray = NULL;
|
|
||||||
ListCell *stripeMetadataCell = NULL;
|
|
||||||
uint8 *tableFooterData = NULL;
|
|
||||||
uint32 tableFooterSize = 0;
|
|
||||||
uint32 stripeIndex = 0;
|
|
||||||
|
|
||||||
List *stripeMetadataList = tableFooter->stripeMetadataList;
|
|
||||||
uint32 stripeCount = list_length(stripeMetadataList);
|
|
||||||
stripeMetadataArray = palloc0(stripeCount * sizeof(Protobuf__StripeMetadata *));
|
|
||||||
|
|
||||||
foreach(stripeMetadataCell, stripeMetadataList)
|
|
||||||
{
|
|
||||||
StripeMetadata *stripeMetadata = lfirst(stripeMetadataCell);
|
|
||||||
|
|
||||||
Protobuf__StripeMetadata *protobufStripeMetadata = NULL;
|
|
||||||
protobufStripeMetadata = palloc0(sizeof(Protobuf__StripeMetadata));
|
|
||||||
protobuf__stripe_metadata__init(protobufStripeMetadata);
|
|
||||||
protobufStripeMetadata->has_fileoffset = true;
|
|
||||||
protobufStripeMetadata->fileoffset = stripeMetadata->fileOffset;
|
|
||||||
protobufStripeMetadata->has_skiplistlength = true;
|
|
||||||
protobufStripeMetadata->skiplistlength = stripeMetadata->skipListLength;
|
|
||||||
protobufStripeMetadata->has_datalength = true;
|
|
||||||
protobufStripeMetadata->datalength = stripeMetadata->dataLength;
|
|
||||||
protobufStripeMetadata->has_footerlength = true;
|
|
||||||
protobufStripeMetadata->footerlength = stripeMetadata->footerLength;
|
|
||||||
protobufStripeMetadata->has_id = true;
|
|
||||||
protobufStripeMetadata->id = stripeMetadata->id;
|
|
||||||
|
|
||||||
stripeMetadataArray[stripeIndex] = protobufStripeMetadata;
|
|
||||||
stripeIndex++;
|
|
||||||
}
|
|
||||||
|
|
||||||
protobufTableFooter.n_stripemetadataarray = stripeCount;
|
|
||||||
protobufTableFooter.stripemetadataarray = stripeMetadataArray;
|
|
||||||
protobufTableFooter.has_blockrowcount = true;
|
|
||||||
protobufTableFooter.blockrowcount = tableFooter->blockRowCount;
|
|
||||||
|
|
||||||
tableFooterSize = protobuf__table_footer__get_packed_size(&protobufTableFooter);
|
|
||||||
tableFooterData = palloc0(tableFooterSize);
|
|
||||||
protobuf__table_footer__pack(&protobufTableFooter, tableFooterData);
|
|
||||||
|
|
||||||
tableFooterBuffer = palloc0(sizeof(StringInfoData));
|
|
||||||
tableFooterBuffer->len = tableFooterSize;
|
|
||||||
tableFooterBuffer->maxlen = tableFooterSize;
|
|
||||||
tableFooterBuffer->data = (char *) tableFooterData;
|
|
||||||
|
|
||||||
return tableFooterBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SerializeColumnSkipList serializes a column skip list, where the colum skip
|
* SerializeColumnSkipList serializes a column skip list, where the colum skip
|
||||||
* list includes all block skip nodes for that column. The function then returns
|
* list includes all block skip nodes for that column. The function then returns
|
||||||
|
@ -194,112 +102,6 @@ SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray, uint32 blockCou
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeserializePostScript deserializes the given postscript buffer and returns
|
|
||||||
* the size of table footer in tableFooterLength pointer.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength)
|
|
||||||
{
|
|
||||||
Protobuf__PostScript *protobufPostScript = NULL;
|
|
||||||
protobufPostScript = protobuf__post_script__unpack(NULL, buffer->len,
|
|
||||||
(uint8 *) buffer->data);
|
|
||||||
if (protobufPostScript == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid postscript buffer")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (protobufPostScript->versionmajor != CSTORE_VERSION_MAJOR ||
|
|
||||||
protobufPostScript->versionminor > CSTORE_VERSION_MINOR)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid column store version number")));
|
|
||||||
}
|
|
||||||
else if (strncmp(protobufPostScript->magicnumber, CSTORE_MAGIC_NUMBER,
|
|
||||||
NAMEDATALEN) != 0)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid magic number")));
|
|
||||||
}
|
|
||||||
|
|
||||||
(*tableFooterLength) = protobufPostScript->tablefooterlength;
|
|
||||||
|
|
||||||
protobuf__post_script__free_unpacked(protobufPostScript, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeserializeTableFooter deserializes the given buffer and returns the result as
|
|
||||||
* a TableFooter struct.
|
|
||||||
*/
|
|
||||||
TableFooter *
|
|
||||||
DeserializeTableFooter(StringInfo buffer)
|
|
||||||
{
|
|
||||||
TableFooter *tableFooter = NULL;
|
|
||||||
Protobuf__TableFooter *protobufTableFooter = NULL;
|
|
||||||
List *stripeMetadataList = NIL;
|
|
||||||
uint64 blockRowCount = 0;
|
|
||||||
uint32 stripeCount = 0;
|
|
||||||
uint32 stripeIndex = 0;
|
|
||||||
|
|
||||||
protobufTableFooter = protobuf__table_footer__unpack(NULL, buffer->len,
|
|
||||||
(uint8 *) buffer->data);
|
|
||||||
if (protobufTableFooter == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid table footer buffer")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!protobufTableFooter->has_blockrowcount)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("missing required table footer metadata fields")));
|
|
||||||
}
|
|
||||||
else if (protobufTableFooter->blockrowcount < BLOCK_ROW_COUNT_MINIMUM ||
|
|
||||||
protobufTableFooter->blockrowcount > BLOCK_ROW_COUNT_MAXIMUM)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("invalid block row count")));
|
|
||||||
}
|
|
||||||
blockRowCount = protobufTableFooter->blockrowcount;
|
|
||||||
|
|
||||||
stripeCount = protobufTableFooter->n_stripemetadataarray;
|
|
||||||
for (stripeIndex = 0; stripeIndex < stripeCount; stripeIndex++)
|
|
||||||
{
|
|
||||||
StripeMetadata *stripeMetadata = NULL;
|
|
||||||
Protobuf__StripeMetadata *protobufStripeMetadata = NULL;
|
|
||||||
|
|
||||||
protobufStripeMetadata = protobufTableFooter->stripemetadataarray[stripeIndex];
|
|
||||||
if (!protobufStripeMetadata->has_fileoffset ||
|
|
||||||
!protobufStripeMetadata->has_skiplistlength ||
|
|
||||||
!protobufStripeMetadata->has_datalength ||
|
|
||||||
!protobufStripeMetadata->has_footerlength)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not unpack column store"),
|
|
||||||
errdetail("missing required stripe metadata fields")));
|
|
||||||
}
|
|
||||||
|
|
||||||
stripeMetadata = palloc0(sizeof(StripeMetadata));
|
|
||||||
stripeMetadata->fileOffset = protobufStripeMetadata->fileoffset;
|
|
||||||
stripeMetadata->skipListLength = protobufStripeMetadata->skiplistlength;
|
|
||||||
stripeMetadata->dataLength = protobufStripeMetadata->datalength;
|
|
||||||
stripeMetadata->footerLength = protobufStripeMetadata->footerlength;
|
|
||||||
stripeMetadata->id = protobufStripeMetadata->id;
|
|
||||||
|
|
||||||
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
protobuf__table_footer__free_unpacked(protobufTableFooter, NULL);
|
|
||||||
|
|
||||||
tableFooter = palloc0(sizeof(TableFooter));
|
|
||||||
tableFooter->stripeMetadataList = stripeMetadataList;
|
|
||||||
tableFooter->blockRowCount = blockRowCount;
|
|
||||||
|
|
||||||
return tableFooter;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeserializeBlockCount deserializes the given column skip list buffer and
|
* DeserializeBlockCount deserializes the given column skip list buffer and
|
||||||
* returns the number of blocks in column skip list.
|
* returns the number of blocks in column skip list.
|
||||||
|
|
|
@ -15,15 +15,12 @@
|
||||||
#define CSTORE_SERIALIZATION_H
|
#define CSTORE_SERIALIZATION_H
|
||||||
|
|
||||||
/* Function declarations for metadata serialization */
|
/* Function declarations for metadata serialization */
|
||||||
extern StringInfo SerializePostScript(uint64 tableFooterLength);
|
|
||||||
extern StringInfo SerializeTableFooter(TableFooter *tableFooter);
|
|
||||||
extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray,
|
extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArray,
|
||||||
uint32 blockCount, bool typeByValue,
|
uint32 blockCount, bool typeByValue,
|
||||||
int typeLength);
|
int typeLength);
|
||||||
|
|
||||||
/* Function declarations for metadata deserialization */
|
/* Function declarations for metadata deserialization */
|
||||||
extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength);
|
extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength);
|
||||||
extern TableFooter * DeserializeTableFooter(StringInfo buffer);
|
|
||||||
extern uint32 DeserializeBlockCount(StringInfo buffer);
|
extern uint32 DeserializeBlockCount(StringInfo buffer);
|
||||||
extern uint32 DeserializeRowCount(StringInfo buffer);
|
extern uint32 DeserializeRowCount(StringInfo buffer);
|
||||||
extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer,
|
extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer,
|
||||||
|
|
|
@ -13,13 +13,22 @@
|
||||||
#include "cstore_version_compat.h"
|
#include "cstore_version_compat.h"
|
||||||
|
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
|
#include "access/heapam.h"
|
||||||
#include "access/nbtree.h"
|
#include "access/nbtree.h"
|
||||||
#include "access/table.h"
|
#include "access/table.h"
|
||||||
|
#include "access/tableam.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/pg_collation.h"
|
#include "catalog/pg_collation.h"
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
|
#include "catalog/namespace.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
#include "commands/trigger.h"
|
||||||
|
#include "executor/executor.h"
|
||||||
|
#include "executor/spi.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "optimizer/optimizer.h"
|
#include "optimizer/optimizer.h"
|
||||||
#include "port.h"
|
#include "port.h"
|
||||||
|
@ -31,11 +40,27 @@
|
||||||
|
|
||||||
#include "cstore_metadata_serialization.h"
|
#include "cstore_metadata_serialization.h"
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
Relation rel;
|
||||||
|
EState *estate;
|
||||||
|
} ModifyState;
|
||||||
|
|
||||||
static Oid CStoreStripeAttrRelationId(void);
|
static Oid CStoreStripeAttrRelationId(void);
|
||||||
static Oid CStoreStripeAttrIndexRelationId(void);
|
static Oid CStoreStripeAttrIndexRelationId(void);
|
||||||
static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
|
static Oid CStoreStripesRelationId(void);
|
||||||
uint64 existsSize, uint64 valuesSize,
|
static Oid CStoreStripesIndexRelationId(void);
|
||||||
uint64 skiplistSize);
|
static Oid CStoreTablesRelationId(void);
|
||||||
|
static Oid CStoreTablesIndexRelationId(void);
|
||||||
|
static Oid CStoreNamespaceId(void);
|
||||||
|
static int TableBlockRowCount(Oid relid);
|
||||||
|
static void DeleteTableMetadataRowIfExists(Oid relid);
|
||||||
|
static ModifyState * StartModifyRelation(Relation rel);
|
||||||
|
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||||
|
bool *nulls);
|
||||||
|
static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
|
||||||
|
static void FinishModifyRelation(ModifyState *state);
|
||||||
|
static EState * create_estate_for_relation(Relation rel);
|
||||||
|
|
||||||
/* constants for cstore_stripe_attr */
|
/* constants for cstore_stripe_attr */
|
||||||
#define Natts_cstore_stripe_attr 6
|
#define Natts_cstore_stripe_attr 6
|
||||||
|
@ -46,50 +71,244 @@ static void InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
|
||||||
#define Anum_cstore_stripe_attr_value_size 5
|
#define Anum_cstore_stripe_attr_value_size 5
|
||||||
#define Anum_cstore_stripe_attr_skiplist_size 6
|
#define Anum_cstore_stripe_attr_skiplist_size 6
|
||||||
|
|
||||||
|
/* constants for cstore_table */
|
||||||
|
#define Natts_cstore_tables 4
|
||||||
|
#define Anum_cstore_tables_relid 1
|
||||||
|
#define Anum_cstore_tables_block_row_count 2
|
||||||
|
#define Anum_cstore_tables_version_major 3
|
||||||
|
#define Anum_cstore_tables_version_minor 4
|
||||||
|
|
||||||
|
/* constants for cstore_stripe */
|
||||||
|
#define Natts_cstore_stripes 5
|
||||||
|
#define Anum_cstore_stripes_relid 1
|
||||||
|
#define Anum_cstore_stripes_stripe 2
|
||||||
|
#define Anum_cstore_stripes_file_offset 3
|
||||||
|
#define Anum_cstore_stripes_skiplist_length 4
|
||||||
|
#define Anum_cstore_stripes_data_length 5
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
||||||
|
{
|
||||||
|
Oid cstoreTablesOid = InvalidOid;
|
||||||
|
Relation cstoreTables = NULL;
|
||||||
|
ModifyState *modifyState = NULL;
|
||||||
|
|
||||||
|
bool nulls[Natts_cstore_tables] = { 0 };
|
||||||
|
Datum values[Natts_cstore_tables] = {
|
||||||
|
ObjectIdGetDatum(relid),
|
||||||
|
Int32GetDatum(blockRowCount),
|
||||||
|
Int32GetDatum(CSTORE_VERSION_MAJOR),
|
||||||
|
Int32GetDatum(CSTORE_VERSION_MINOR)
|
||||||
|
};
|
||||||
|
|
||||||
|
DeleteTableMetadataRowIfExists(relid);
|
||||||
|
|
||||||
|
cstoreTablesOid = CStoreTablesRelationId();
|
||||||
|
cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock);
|
||||||
|
|
||||||
|
modifyState = StartModifyRelation(cstoreTables);
|
||||||
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
|
FinishModifyRelation(modifyState);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
heap_close(cstoreTables, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
||||||
|
{
|
||||||
|
bool nulls[Natts_cstore_stripes] = { 0 };
|
||||||
|
Datum values[Natts_cstore_stripes] = {
|
||||||
|
ObjectIdGetDatum(relid),
|
||||||
|
Int64GetDatum(stripe->id),
|
||||||
|
Int64GetDatum(stripe->fileOffset),
|
||||||
|
Int64GetDatum(stripe->skipListLength),
|
||||||
|
Int64GetDatum(stripe->dataLength)
|
||||||
|
};
|
||||||
|
|
||||||
|
Oid cstoreStripesOid = CStoreStripesRelationId();
|
||||||
|
Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock);
|
||||||
|
|
||||||
|
ModifyState *modifyState = StartModifyRelation(cstoreStripes);
|
||||||
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
|
FinishModifyRelation(modifyState);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
heap_close(cstoreStripes, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReadTableMetadata constructs TableMetadata for a given relid by reading
|
||||||
|
* from cstore_tables and cstore_stripes.
|
||||||
|
*/
|
||||||
|
TableMetadata *
|
||||||
|
ReadTableMetadata(Oid relid)
|
||||||
|
{
|
||||||
|
Oid cstoreStripesOid = InvalidOid;
|
||||||
|
Relation cstoreStripes = NULL;
|
||||||
|
Relation index = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple;
|
||||||
|
|
||||||
|
TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata));
|
||||||
|
tableMetadata->blockRowCount = TableBlockRowCount(relid);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||||
|
|
||||||
|
cstoreStripesOid = CStoreStripesRelationId();
|
||||||
|
cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
||||||
|
index = index_open(CStoreStripesIndexRelationId(), AccessShareLock);
|
||||||
|
tupleDescriptor = RelationGetDescr(cstoreStripes);
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, NULL, 1, scanKey);
|
||||||
|
|
||||||
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
|
{
|
||||||
|
StripeMetadata *stripeMetadata = NULL;
|
||||||
|
Datum datumArray[Natts_cstore_stripes];
|
||||||
|
bool isNullArray[Natts_cstore_stripes];
|
||||||
|
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
|
stripeMetadata = palloc0(sizeof(StripeMetadata));
|
||||||
|
stripeMetadata->id = DatumGetInt64(datumArray[Anum_cstore_stripes_stripe - 1]);
|
||||||
|
stripeMetadata->fileOffset = DatumGetInt64(
|
||||||
|
datumArray[Anum_cstore_stripes_file_offset - 1]);
|
||||||
|
stripeMetadata->dataLength = DatumGetInt64(
|
||||||
|
datumArray[Anum_cstore_stripes_data_length - 1]);
|
||||||
|
stripeMetadata->skipListLength = DatumGetInt64(
|
||||||
|
datumArray[Anum_cstore_stripes_skiplist_length - 1]);
|
||||||
|
|
||||||
|
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
||||||
|
stripeMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(index, NoLock);
|
||||||
|
heap_close(cstoreStripes, NoLock);
|
||||||
|
|
||||||
|
return tableMetadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TableBlockRowCount returns block_row_count column from cstore_tables for a given relid.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
TableBlockRowCount(Oid relid)
|
||||||
|
{
|
||||||
|
int blockRowCount = 0;
|
||||||
|
Oid cstoreTablesOid = InvalidOid;
|
||||||
|
Relation cstoreTables = NULL;
|
||||||
|
Relation index = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||||
|
|
||||||
|
cstoreTablesOid = CStoreTablesRelationId();
|
||||||
|
cstoreTables = heap_open(cstoreTablesOid, AccessShareLock);
|
||||||
|
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
||||||
|
tupleDescriptor = RelationGetDescr(cstoreTables);
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
Datum datumArray[Natts_cstore_tables];
|
||||||
|
bool isNullArray[Natts_cstore_tables];
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(index, NoLock);
|
||||||
|
heap_close(cstoreTables, NoLock);
|
||||||
|
|
||||||
|
return blockRowCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeleteTableMetadataRowIfExists removes the row with given relid from cstore_stripes.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DeleteTableMetadataRowIfExists(Oid relid)
|
||||||
|
{
|
||||||
|
Oid cstoreTablesOid = InvalidOid;
|
||||||
|
Relation cstoreTables = NULL;
|
||||||
|
Relation index = NULL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||||
|
|
||||||
|
cstoreTablesOid = CStoreTablesRelationId();
|
||||||
|
cstoreTables = table_open(cstoreTablesOid, AccessShareLock);
|
||||||
|
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
ModifyState *modifyState = StartModifyRelation(cstoreTables);
|
||||||
|
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
|
||||||
|
FinishModifyRelation(modifyState);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(index, NoLock);
|
||||||
|
table_close(cstoreTables, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records.
|
* SaveStripeFooter stores give StripeFooter as cstore_stripe_attr records.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
|
SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer)
|
||||||
{
|
{
|
||||||
|
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
|
||||||
|
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock);
|
||||||
|
|
||||||
|
ModifyState *modifyState = StartModifyRelation(cstoreStripeAttrs);
|
||||||
|
|
||||||
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
|
for (AttrNumber attr = 1; attr <= footer->columnCount; attr++)
|
||||||
{
|
|
||||||
InsertStripeAttrRow(relid, stripe, attr,
|
|
||||||
footer->existsSizeArray[attr - 1],
|
|
||||||
footer->valueSizeArray[attr - 1],
|
|
||||||
footer->skipListSizeArray[attr - 1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InsertStripeAttrRow adds a row to cstore_stripe_attr.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
InsertStripeAttrRow(Oid relid, uint64 stripe, AttrNumber attr,
|
|
||||||
uint64 existsSize, uint64 valuesSize,
|
|
||||||
uint64 skiplistSize)
|
|
||||||
{
|
{
|
||||||
bool nulls[Natts_cstore_stripe_attr] = { 0 };
|
bool nulls[Natts_cstore_stripe_attr] = { 0 };
|
||||||
Datum values[Natts_cstore_stripe_attr] = {
|
Datum values[Natts_cstore_stripe_attr] = {
|
||||||
ObjectIdGetDatum(relid),
|
ObjectIdGetDatum(relid),
|
||||||
Int64GetDatum(stripe),
|
Int64GetDatum(stripe),
|
||||||
Int16GetDatum(attr),
|
Int16GetDatum(attr),
|
||||||
Int64GetDatum(existsSize),
|
Int64GetDatum(footer->existsSizeArray[attr - 1]),
|
||||||
Int64GetDatum(valuesSize),
|
Int64GetDatum(footer->valueSizeArray[attr - 1]),
|
||||||
Int64GetDatum(skiplistSize)
|
Int64GetDatum(footer->skipListSizeArray[attr - 1])
|
||||||
};
|
};
|
||||||
|
|
||||||
Oid cstoreStripeAttrOid = CStoreStripeAttrRelationId();
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
Relation cstoreStripeAttrs = heap_open(cstoreStripeAttrOid, RowExclusiveLock);
|
}
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripeAttrs);
|
|
||||||
|
|
||||||
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
|
||||||
|
|
||||||
CatalogTupleInsert(cstoreStripeAttrs, tuple);
|
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
|
|
||||||
|
FinishModifyRelation(modifyState);
|
||||||
heap_close(cstoreStripeAttrs, NoLock);
|
heap_close(cstoreStripeAttrs, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +383,118 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartModifyRelation allocates resources for modifications.
|
||||||
|
*/
|
||||||
|
static ModifyState *
|
||||||
|
StartModifyRelation(Relation rel)
|
||||||
|
{
|
||||||
|
ModifyState *modifyState = NULL;
|
||||||
|
EState *estate = create_estate_for_relation(rel);
|
||||||
|
|
||||||
|
/* ExecSimpleRelationInsert, ... require caller to open indexes */
|
||||||
|
ExecOpenIndices(estate->es_result_relation_info, false);
|
||||||
|
|
||||||
|
modifyState = palloc(sizeof(ModifyState));
|
||||||
|
modifyState->rel = rel;
|
||||||
|
modifyState->estate = estate;
|
||||||
|
|
||||||
|
return modifyState;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes
|
||||||
|
* sure constraints are enforced and indexes are updated.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
|
||||||
|
{
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(state->rel);
|
||||||
|
HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||||
|
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
|
||||||
|
&TTSOpsHeapTuple);
|
||||||
|
ExecStoreHeapTuple(tuple, slot, false);
|
||||||
|
|
||||||
|
/* use ExecSimpleRelationInsert to enforce constraints */
|
||||||
|
ExecSimpleRelationInsert(state->estate, slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeleteTupleAndEnforceConstraints deletes a tuple from a relation and
|
||||||
|
* makes sure constraints (e.g. FK constraints) are enforced.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
|
||||||
|
{
|
||||||
|
EState *estate = state->estate;
|
||||||
|
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
||||||
|
|
||||||
|
ItemPointer tid = &(heapTuple->t_self);
|
||||||
|
simple_table_tuple_delete(state->rel, tid, estate->es_snapshot);
|
||||||
|
|
||||||
|
/* execute AFTER ROW DELETE Triggers to enforce constraints */
|
||||||
|
ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FinishModifyRelation cleans up resources after modifications are done.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
FinishModifyRelation(ModifyState *state)
|
||||||
|
{
|
||||||
|
ExecCloseIndices(state->estate->es_result_relation_info);
|
||||||
|
|
||||||
|
AfterTriggerEndQuery(state->estate);
|
||||||
|
ExecCleanUpTriggerState(state->estate);
|
||||||
|
ExecResetTupleTable(state->estate->es_tupleTable, false);
|
||||||
|
FreeExecutorState(state->estate);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Based on a similar function from
|
||||||
|
* postgres/src/backend/replication/logical/worker.c.
|
||||||
|
*
|
||||||
|
* Executor state preparation for evaluation of constraint expressions,
|
||||||
|
* indexes and triggers.
|
||||||
|
*
|
||||||
|
* This is based on similar code in copy.c
|
||||||
|
*/
|
||||||
|
static EState *
|
||||||
|
create_estate_for_relation(Relation rel)
|
||||||
|
{
|
||||||
|
EState *estate;
|
||||||
|
ResultRelInfo *resultRelInfo;
|
||||||
|
RangeTblEntry *rte;
|
||||||
|
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
|
||||||
|
rte = makeNode(RangeTblEntry);
|
||||||
|
rte->rtekind = RTE_RELATION;
|
||||||
|
rte->relid = RelationGetRelid(rel);
|
||||||
|
rte->relkind = rel->rd_rel->relkind;
|
||||||
|
rte->rellockmode = AccessShareLock;
|
||||||
|
ExecInitRangeTable(estate, list_make1(rte));
|
||||||
|
|
||||||
|
resultRelInfo = makeNode(ResultRelInfo);
|
||||||
|
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
|
||||||
|
|
||||||
|
estate->es_result_relations = resultRelInfo;
|
||||||
|
estate->es_num_result_relations = 1;
|
||||||
|
estate->es_result_relation_info = resultRelInfo;
|
||||||
|
|
||||||
|
estate->es_output_cid = GetCurrentCommandId(true);
|
||||||
|
|
||||||
|
/* Prepare to catch AFTER triggers. */
|
||||||
|
AfterTriggerBeginQuery();
|
||||||
|
|
||||||
|
return estate;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
|
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr.
|
||||||
* TODO: should we cache this similar to citus?
|
* TODO: should we cache this similar to citus?
|
||||||
|
@ -171,16 +502,71 @@ ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount)
|
||||||
static Oid
|
static Oid
|
||||||
CStoreStripeAttrRelationId(void)
|
CStoreStripeAttrRelationId(void)
|
||||||
{
|
{
|
||||||
return get_relname_relid("cstore_stripe_attr", PG_CATALOG_NAMESPACE);
|
return get_relname_relid("cstore_stripe_attr", CStoreNamespaceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_idx.
|
* CStoreStripeAttrRelationId returns relation id of cstore_stripe_attr_pkey.
|
||||||
* TODO: should we cache this similar to citus?
|
* TODO: should we cache this similar to citus?
|
||||||
*/
|
*/
|
||||||
static Oid
|
static Oid
|
||||||
CStoreStripeAttrIndexRelationId(void)
|
CStoreStripeAttrIndexRelationId(void)
|
||||||
{
|
{
|
||||||
return get_relname_relid("cstore_stripe_attr_idx", PG_CATALOG_NAMESPACE);
|
return get_relname_relid("cstore_stripe_attr_pkey", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreStripesRelationId returns relation id of cstore_stripes.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreStripesRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_stripes", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreStripesIndexRelationId returns relation id of cstore_stripes_idx.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreStripesIndexRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_stripes_pkey", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreTablesRelationId returns relation id of cstore_tables.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreTablesRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_tables", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreTablesIndexRelationId returns relation id of cstore_tables_idx.
|
||||||
|
* TODO: should we cache this similar to citus?
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreTablesIndexRelationId(void)
|
||||||
|
{
|
||||||
|
return get_relname_relid("cstore_tables_pkey", CStoreNamespaceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreNamespaceId returns namespace id of the schema we store cstore
|
||||||
|
* related tables.
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
CStoreNamespaceId(void)
|
||||||
|
{
|
||||||
|
return get_namespace_oid("cstore", false);
|
||||||
}
|
}
|
||||||
|
|
142
cstore_reader.c
142
cstore_reader.c
|
@ -82,7 +82,6 @@ static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
||||||
Form_pg_attribute attributeForm);
|
Form_pg_attribute attributeForm);
|
||||||
static int64 FILESize(FILE *file);
|
|
||||||
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size);
|
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size);
|
||||||
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
|
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
|
||||||
uint32 columnCount);
|
uint32 columnCount);
|
||||||
|
@ -99,20 +98,14 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *whereClauseList)
|
List *projectedColumnList, List *whereClauseList)
|
||||||
{
|
{
|
||||||
TableReadState *readState = NULL;
|
TableReadState *readState = NULL;
|
||||||
TableFooter *tableFooter = NULL;
|
TableMetadata *tableMetadata = NULL;
|
||||||
FILE *tableFile = NULL;
|
FILE *tableFile = NULL;
|
||||||
MemoryContext stripeReadContext = NULL;
|
MemoryContext stripeReadContext = NULL;
|
||||||
uint32 columnCount = 0;
|
uint32 columnCount = 0;
|
||||||
bool *projectedColumnMask = NULL;
|
bool *projectedColumnMask = NULL;
|
||||||
ColumnBlockData **blockDataArray = NULL;
|
ColumnBlockData **blockDataArray = NULL;
|
||||||
|
|
||||||
StringInfo tableFooterFilename = makeStringInfo();
|
tableMetadata = ReadTableMetadata(relationId);
|
||||||
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
|
|
||||||
|
|
||||||
tableFooter = CStoreReadFooter(tableFooterFilename);
|
|
||||||
|
|
||||||
pfree(tableFooterFilename->data);
|
|
||||||
pfree(tableFooterFilename);
|
|
||||||
|
|
||||||
tableFile = AllocateFile(filename, PG_BINARY_R);
|
tableFile = AllocateFile(filename, PG_BINARY_R);
|
||||||
if (tableFile == NULL)
|
if (tableFile == NULL)
|
||||||
|
@ -134,12 +127,12 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
||||||
columnCount = tupleDescriptor->natts;
|
columnCount = tupleDescriptor->natts;
|
||||||
projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||||
blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask,
|
blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask,
|
||||||
tableFooter->blockRowCount);
|
tableMetadata->blockRowCount);
|
||||||
|
|
||||||
readState = palloc0(sizeof(TableReadState));
|
readState = palloc0(sizeof(TableReadState));
|
||||||
readState->relationId = relationId;
|
readState->relationId = relationId;
|
||||||
readState->tableFile = tableFile;
|
readState->tableFile = tableFile;
|
||||||
readState->tableFooter = tableFooter;
|
readState->tableMetadata = tableMetadata;
|
||||||
readState->projectedColumnList = projectedColumnList;
|
readState->projectedColumnList = projectedColumnList;
|
||||||
readState->whereClauseList = whereClauseList;
|
readState->whereClauseList = whereClauseList;
|
||||||
readState->stripeBuffers = NULL;
|
readState->stripeBuffers = NULL;
|
||||||
|
@ -154,76 +147,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CStoreReadFooter reads the cstore file footer from the given file. First, the
|
|
||||||
* function reads the last byte of the file as the postscript size. Then, the
|
|
||||||
* function reads the postscript. Last, the function reads and deserializes the
|
|
||||||
* footer.
|
|
||||||
*/
|
|
||||||
TableFooter *
|
|
||||||
CStoreReadFooter(StringInfo tableFooterFilename)
|
|
||||||
{
|
|
||||||
TableFooter *tableFooter = NULL;
|
|
||||||
FILE *tableFooterFile = NULL;
|
|
||||||
uint64 footerOffset = 0;
|
|
||||||
uint64 footerLength = 0;
|
|
||||||
StringInfo postscriptBuffer = NULL;
|
|
||||||
StringInfo postscriptSizeBuffer = NULL;
|
|
||||||
uint64 postscriptSizeOffset = 0;
|
|
||||||
uint8 postscriptSize = 0;
|
|
||||||
uint64 footerFileSize = 0;
|
|
||||||
uint64 postscriptOffset = 0;
|
|
||||||
StringInfo footerBuffer = NULL;
|
|
||||||
int freeResult = 0;
|
|
||||||
|
|
||||||
tableFooterFile = AllocateFile(tableFooterFilename->data, PG_BINARY_R);
|
|
||||||
if (tableFooterFile == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not open file \"%s\" for reading: %m",
|
|
||||||
tableFooterFilename->data),
|
|
||||||
errhint("Try copying in data to the table.")));
|
|
||||||
}
|
|
||||||
|
|
||||||
footerFileSize = FILESize(tableFooterFile);
|
|
||||||
if (footerFileSize < CSTORE_POSTSCRIPT_SIZE_LENGTH)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("invalid cstore file")));
|
|
||||||
}
|
|
||||||
|
|
||||||
postscriptSizeOffset = footerFileSize - CSTORE_POSTSCRIPT_SIZE_LENGTH;
|
|
||||||
postscriptSizeBuffer = ReadFromFile(tableFooterFile, postscriptSizeOffset,
|
|
||||||
CSTORE_POSTSCRIPT_SIZE_LENGTH);
|
|
||||||
memcpy(&postscriptSize, postscriptSizeBuffer->data, CSTORE_POSTSCRIPT_SIZE_LENGTH);
|
|
||||||
if (postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("invalid postscript size")));
|
|
||||||
}
|
|
||||||
|
|
||||||
postscriptOffset = footerFileSize - (CSTORE_POSTSCRIPT_SIZE_LENGTH + postscriptSize);
|
|
||||||
postscriptBuffer = ReadFromFile(tableFooterFile, postscriptOffset, postscriptSize);
|
|
||||||
|
|
||||||
DeserializePostScript(postscriptBuffer, &footerLength);
|
|
||||||
if (footerLength + postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("invalid footer size")));
|
|
||||||
}
|
|
||||||
|
|
||||||
footerOffset = postscriptOffset - footerLength;
|
|
||||||
footerBuffer = ReadFromFile(tableFooterFile, footerOffset, footerLength);
|
|
||||||
tableFooter = DeserializeTableFooter(footerBuffer);
|
|
||||||
|
|
||||||
freeResult = FreeFile(tableFooterFile);
|
|
||||||
if (freeResult != 0)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not close file: %m")));
|
|
||||||
}
|
|
||||||
|
|
||||||
return tableFooter;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreReadNextRow tries to read a row from the cstore file. On success, it sets
|
* CStoreReadNextRow tries to read a row from the cstore file. On success, it sets
|
||||||
* column values and nulls, and returns true. If there are no more rows to read,
|
* column values and nulls, and returns true. If there are no more rows to read,
|
||||||
|
@ -234,7 +157,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
||||||
{
|
{
|
||||||
uint32 blockIndex = 0;
|
uint32 blockIndex = 0;
|
||||||
uint32 blockRowIndex = 0;
|
uint32 blockRowIndex = 0;
|
||||||
TableFooter *tableFooter = readState->tableFooter;
|
TableMetadata *tableMetadata = readState->tableMetadata;
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -247,7 +170,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
||||||
{
|
{
|
||||||
StripeBuffers *stripeBuffers = NULL;
|
StripeBuffers *stripeBuffers = NULL;
|
||||||
StripeMetadata *stripeMetadata = NULL;
|
StripeMetadata *stripeMetadata = NULL;
|
||||||
List *stripeMetadataList = tableFooter->stripeMetadataList;
|
List *stripeMetadataList = tableMetadata->stripeMetadataList;
|
||||||
uint32 stripeCount = list_length(stripeMetadataList);
|
uint32 stripeCount = list_length(stripeMetadataList);
|
||||||
StripeFooter *stripeFooter = NULL;
|
StripeFooter *stripeFooter = NULL;
|
||||||
|
|
||||||
|
@ -284,8 +207,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockIndex = readState->stripeReadRowCount / tableFooter->blockRowCount;
|
blockIndex = readState->stripeReadRowCount / tableMetadata->blockRowCount;
|
||||||
blockRowIndex = readState->stripeReadRowCount % tableFooter->blockRowCount;
|
blockRowIndex = readState->stripeReadRowCount % tableMetadata->blockRowCount;
|
||||||
|
|
||||||
if (blockIndex != readState->deserializedBlockIndex)
|
if (blockIndex != readState->deserializedBlockIndex)
|
||||||
{
|
{
|
||||||
|
@ -294,14 +217,14 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
||||||
uint32 stripeRowCount = 0;
|
uint32 stripeRowCount = 0;
|
||||||
|
|
||||||
stripeRowCount = readState->stripeBuffers->rowCount;
|
stripeRowCount = readState->stripeBuffers->rowCount;
|
||||||
lastBlockIndex = stripeRowCount / tableFooter->blockRowCount;
|
lastBlockIndex = stripeRowCount / tableMetadata->blockRowCount;
|
||||||
if (blockIndex == lastBlockIndex)
|
if (blockIndex == lastBlockIndex)
|
||||||
{
|
{
|
||||||
blockRowCount = stripeRowCount % tableFooter->blockRowCount;
|
blockRowCount = stripeRowCount % tableMetadata->blockRowCount;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
blockRowCount = tableFooter->blockRowCount;
|
blockRowCount = tableMetadata->blockRowCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
|
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
|
||||||
|
@ -341,9 +264,9 @@ CStoreEndRead(TableReadState *readState)
|
||||||
|
|
||||||
MemoryContextDelete(readState->stripeReadContext);
|
MemoryContextDelete(readState->stripeReadContext);
|
||||||
FreeFile(readState->tableFile);
|
FreeFile(readState->tableFile);
|
||||||
list_free_deep(readState->tableFooter->stripeMetadataList);
|
list_free_deep(readState->tableMetadata->stripeMetadataList);
|
||||||
FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
|
FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
|
||||||
pfree(readState->tableFooter);
|
pfree(readState->tableMetadata);
|
||||||
pfree(readState);
|
pfree(readState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,19 +328,12 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount)
|
||||||
uint64
|
uint64
|
||||||
CStoreTableRowCount(Oid relid, const char *filename)
|
CStoreTableRowCount(Oid relid, const char *filename)
|
||||||
{
|
{
|
||||||
TableFooter *tableFooter = NULL;
|
TableMetadata *tableMetadata = NULL;
|
||||||
FILE *tableFile;
|
FILE *tableFile;
|
||||||
ListCell *stripeMetadataCell = NULL;
|
ListCell *stripeMetadataCell = NULL;
|
||||||
uint64 totalRowCount = 0;
|
uint64 totalRowCount = 0;
|
||||||
|
|
||||||
StringInfo tableFooterFilename = makeStringInfo();
|
tableMetadata = ReadTableMetadata(relid);
|
||||||
|
|
||||||
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
|
|
||||||
|
|
||||||
tableFooter = CStoreReadFooter(tableFooterFilename);
|
|
||||||
|
|
||||||
pfree(tableFooterFilename->data);
|
|
||||||
pfree(tableFooterFilename);
|
|
||||||
|
|
||||||
tableFile = AllocateFile(filename, PG_BINARY_R);
|
tableFile = AllocateFile(filename, PG_BINARY_R);
|
||||||
if (tableFile == NULL)
|
if (tableFile == NULL)
|
||||||
|
@ -426,7 +342,7 @@ CStoreTableRowCount(Oid relid, const char *filename)
|
||||||
errmsg("could not open file \"%s\" for reading: %m", filename)));
|
errmsg("could not open file \"%s\" for reading: %m", filename)));
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(stripeMetadataCell, tableFooter->stripeMetadataList)
|
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
||||||
totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata);
|
totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata);
|
||||||
|
@ -1263,32 +1179,6 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Returns the size of the given file handle. */
|
|
||||||
static int64
|
|
||||||
FILESize(FILE *file)
|
|
||||||
{
|
|
||||||
int64 fileSize = 0;
|
|
||||||
int fseekResult = 0;
|
|
||||||
|
|
||||||
errno = 0;
|
|
||||||
fseekResult = fseeko(file, 0, SEEK_END);
|
|
||||||
if (fseekResult != 0)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not seek in file: %m")));
|
|
||||||
}
|
|
||||||
|
|
||||||
fileSize = ftello(file);
|
|
||||||
if (fileSize == -1)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not get position in file: %m")));
|
|
||||||
}
|
|
||||||
|
|
||||||
return fileSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Reads the given segment from the given file. */
|
/* Reads the given segment from the given file. */
|
||||||
static StringInfo
|
static StringInfo
|
||||||
ReadFromFile(FILE *file, uint64 offset, uint32 size)
|
ReadFromFile(FILE *file, uint64 offset, uint32 size)
|
||||||
|
|
129
cstore_writer.c
129
cstore_writer.c
|
@ -27,7 +27,6 @@
|
||||||
#include "cstore_metadata_serialization.h"
|
#include "cstore_metadata_serialization.h"
|
||||||
#include "cstore_version_compat.h"
|
#include "cstore_version_compat.h"
|
||||||
|
|
||||||
static void CStoreWriteFooter(StringInfo footerFileName, TableFooter *tableFooter);
|
|
||||||
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
||||||
uint32 blockRowCount,
|
uint32 blockRowCount,
|
||||||
uint32 columnCount);
|
uint32 columnCount);
|
||||||
|
@ -50,7 +49,7 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
|
||||||
int columnTypeLength, Oid columnCollation,
|
int columnTypeLength, Oid columnCollation,
|
||||||
FmgrInfo *comparisonFunction);
|
FmgrInfo *comparisonFunction);
|
||||||
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
||||||
static void AppendStripeMetadata(TableFooter *tableFooter,
|
static void AppendStripeMetadata(TableMetadata *tableMetadata,
|
||||||
StripeMetadata stripeMetadata);
|
StripeMetadata stripeMetadata);
|
||||||
static void WriteToFile(FILE *file, void *data, uint32 dataLength);
|
static void WriteToFile(FILE *file, void *data, uint32 dataLength);
|
||||||
static void SyncAndCloseFile(FILE *file);
|
static void SyncAndCloseFile(FILE *file);
|
||||||
|
@ -72,26 +71,17 @@ CStoreBeginWrite(Oid relationId,
|
||||||
{
|
{
|
||||||
TableWriteState *writeState = NULL;
|
TableWriteState *writeState = NULL;
|
||||||
FILE *tableFile = NULL;
|
FILE *tableFile = NULL;
|
||||||
StringInfo tableFooterFilename = NULL;
|
TableMetadata *tableMetadata = NULL;
|
||||||
TableFooter *tableFooter = NULL;
|
|
||||||
FmgrInfo **comparisonFunctionArray = NULL;
|
FmgrInfo **comparisonFunctionArray = NULL;
|
||||||
MemoryContext stripeWriteContext = NULL;
|
MemoryContext stripeWriteContext = NULL;
|
||||||
uint64 currentFileOffset = 0;
|
uint64 currentFileOffset = 0;
|
||||||
uint32 columnCount = 0;
|
uint32 columnCount = 0;
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
struct stat statBuffer;
|
|
||||||
int statResult = 0;
|
|
||||||
bool *columnMaskArray = NULL;
|
bool *columnMaskArray = NULL;
|
||||||
ColumnBlockData **blockData = NULL;
|
ColumnBlockData **blockData = NULL;
|
||||||
uint64 currentStripeId = 0;
|
uint64 currentStripeId = 0;
|
||||||
|
|
||||||
tableFooterFilename = makeStringInfo();
|
tableFile = AllocateFile(filename, "a+");
|
||||||
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
|
|
||||||
|
|
||||||
statResult = stat(tableFooterFilename->data, &statBuffer);
|
|
||||||
if (statResult < 0)
|
|
||||||
{
|
|
||||||
tableFile = AllocateFile(filename, "w");
|
|
||||||
if (tableFile == NULL)
|
if (tableFile == NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
@ -99,34 +89,19 @@ CStoreBeginWrite(Oid relationId,
|
||||||
filename)));
|
filename)));
|
||||||
}
|
}
|
||||||
|
|
||||||
tableFooter = palloc0(sizeof(TableFooter));
|
tableMetadata = ReadTableMetadata(relationId);
|
||||||
tableFooter->blockRowCount = blockRowCount;
|
|
||||||
tableFooter->stripeMetadataList = NIL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
tableFile = AllocateFile(filename, "r+");
|
|
||||||
if (tableFile == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not open file \"%s\" for writing: %m",
|
|
||||||
filename)));
|
|
||||||
}
|
|
||||||
|
|
||||||
tableFooter = CStoreReadFooter(tableFooterFilename);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If stripeMetadataList is not empty, jump to the position right after
|
* If stripeMetadataList is not empty, jump to the position right after
|
||||||
* the last position.
|
* the last position.
|
||||||
*/
|
*/
|
||||||
if (tableFooter->stripeMetadataList != NIL)
|
if (tableMetadata->stripeMetadataList != NIL)
|
||||||
{
|
{
|
||||||
StripeMetadata *lastStripe = NULL;
|
StripeMetadata *lastStripe = NULL;
|
||||||
uint64 lastStripeSize = 0;
|
uint64 lastStripeSize = 0;
|
||||||
int fseekResult = 0;
|
int fseekResult = 0;
|
||||||
|
|
||||||
lastStripe = llast(tableFooter->stripeMetadataList);
|
lastStripe = llast(tableMetadata->stripeMetadataList);
|
||||||
lastStripeSize += lastStripe->skipListLength;
|
lastStripeSize += lastStripe->skipListLength;
|
||||||
lastStripeSize += lastStripe->dataLength;
|
lastStripeSize += lastStripe->dataLength;
|
||||||
lastStripeSize += lastStripe->footerLength;
|
lastStripeSize += lastStripe->footerLength;
|
||||||
|
@ -180,8 +155,7 @@ CStoreBeginWrite(Oid relationId,
|
||||||
writeState = palloc0(sizeof(TableWriteState));
|
writeState = palloc0(sizeof(TableWriteState));
|
||||||
writeState->relationId = relationId;
|
writeState->relationId = relationId;
|
||||||
writeState->tableFile = tableFile;
|
writeState->tableFile = tableFile;
|
||||||
writeState->tableFooterFilename = tableFooterFilename;
|
writeState->tableMetadata = tableMetadata;
|
||||||
writeState->tableFooter = tableFooter;
|
|
||||||
writeState->compressionType = compressionType;
|
writeState->compressionType = compressionType;
|
||||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||||
writeState->tupleDescriptor = tupleDescriptor;
|
writeState->tupleDescriptor = tupleDescriptor;
|
||||||
|
@ -215,8 +189,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||||
uint32 columnCount = writeState->tupleDescriptor->natts;
|
uint32 columnCount = writeState->tupleDescriptor->natts;
|
||||||
TableFooter *tableFooter = writeState->tableFooter;
|
TableMetadata *tableMetadata = writeState->tableMetadata;
|
||||||
const uint32 blockRowCount = tableFooter->blockRowCount;
|
const uint32 blockRowCount = tableMetadata->blockRowCount;
|
||||||
ColumnBlockData **blockDataArray = writeState->blockDataArray;
|
ColumnBlockData **blockDataArray = writeState->blockDataArray;
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||||
|
|
||||||
|
@ -304,7 +278,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
* doesn't free it.
|
* doesn't free it.
|
||||||
*/
|
*/
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
AppendStripeMetadata(tableFooter, stripeMetadata);
|
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
||||||
|
AppendStripeMetadata(tableMetadata, stripeMetadata);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -322,9 +297,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
void
|
void
|
||||||
CStoreEndWrite(TableWriteState *writeState)
|
CStoreEndWrite(TableWriteState *writeState)
|
||||||
{
|
{
|
||||||
StringInfo tableFooterFilename = NULL;
|
|
||||||
StringInfo tempTableFooterFileName = NULL;
|
|
||||||
int renameResult = 0;
|
|
||||||
int columnCount = writeState->tupleDescriptor->natts;
|
int columnCount = writeState->tupleDescriptor->natts;
|
||||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||||
|
|
||||||
|
@ -336,85 +308,20 @@ CStoreEndWrite(TableWriteState *writeState)
|
||||||
MemoryContextReset(writeState->stripeWriteContext);
|
MemoryContextReset(writeState->stripeWriteContext);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
AppendStripeMetadata(writeState->tableFooter, stripeMetadata);
|
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
||||||
|
AppendStripeMetadata(writeState->tableMetadata, stripeMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAndCloseFile(writeState->tableFile);
|
SyncAndCloseFile(writeState->tableFile);
|
||||||
|
|
||||||
tableFooterFilename = writeState->tableFooterFilename;
|
|
||||||
tempTableFooterFileName = makeStringInfo();
|
|
||||||
appendStringInfo(tempTableFooterFileName, "%s%s", tableFooterFilename->data,
|
|
||||||
CSTORE_TEMP_FILE_SUFFIX);
|
|
||||||
|
|
||||||
CStoreWriteFooter(tempTableFooterFileName, writeState->tableFooter);
|
|
||||||
|
|
||||||
renameResult = rename(tempTableFooterFileName->data, tableFooterFilename->data);
|
|
||||||
if (renameResult != 0)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not rename file \"%s\" to \"%s\": %m",
|
|
||||||
tempTableFooterFileName->data,
|
|
||||||
tableFooterFilename->data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
pfree(tempTableFooterFileName->data);
|
|
||||||
pfree(tempTableFooterFileName);
|
|
||||||
|
|
||||||
MemoryContextDelete(writeState->stripeWriteContext);
|
MemoryContextDelete(writeState->stripeWriteContext);
|
||||||
list_free_deep(writeState->tableFooter->stripeMetadataList);
|
list_free_deep(writeState->tableMetadata->stripeMetadataList);
|
||||||
pfree(writeState->tableFooter);
|
|
||||||
pfree(writeState->tableFooterFilename->data);
|
|
||||||
pfree(writeState->tableFooterFilename);
|
|
||||||
pfree(writeState->comparisonFunctionArray);
|
pfree(writeState->comparisonFunctionArray);
|
||||||
FreeColumnBlockDataArray(writeState->blockDataArray, columnCount);
|
FreeColumnBlockDataArray(writeState->blockDataArray, columnCount);
|
||||||
pfree(writeState);
|
pfree(writeState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CStoreWriteFooter writes the given footer to given file. First, the function
|
|
||||||
* serializes and writes the footer to the file. Then, the function serializes
|
|
||||||
* and writes the postscript. Then, the function writes the postscript size as
|
|
||||||
* the last byte of the file. Last, the function syncs and closes the footer file.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CStoreWriteFooter(StringInfo tableFooterFilename, TableFooter *tableFooter)
|
|
||||||
{
|
|
||||||
FILE *tableFooterFile = NULL;
|
|
||||||
StringInfo tableFooterBuffer = NULL;
|
|
||||||
StringInfo postscriptBuffer = NULL;
|
|
||||||
uint8 postscriptSize = 0;
|
|
||||||
|
|
||||||
tableFooterFile = AllocateFile(tableFooterFilename->data, PG_BINARY_W);
|
|
||||||
if (tableFooterFile == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not open file \"%s\" for writing: %m",
|
|
||||||
tableFooterFilename->data)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* write the footer */
|
|
||||||
tableFooterBuffer = SerializeTableFooter(tableFooter);
|
|
||||||
WriteToFile(tableFooterFile, tableFooterBuffer->data, tableFooterBuffer->len);
|
|
||||||
|
|
||||||
/* write the postscript */
|
|
||||||
postscriptBuffer = SerializePostScript(tableFooterBuffer->len);
|
|
||||||
WriteToFile(tableFooterFile, postscriptBuffer->data, postscriptBuffer->len);
|
|
||||||
|
|
||||||
/* write the 1-byte postscript size */
|
|
||||||
Assert(postscriptBuffer->len < CSTORE_POSTSCRIPT_SIZE_MAX);
|
|
||||||
postscriptSize = postscriptBuffer->len;
|
|
||||||
WriteToFile(tableFooterFile, &postscriptSize, CSTORE_POSTSCRIPT_SIZE_LENGTH);
|
|
||||||
|
|
||||||
SyncAndCloseFile(tableFooterFile);
|
|
||||||
|
|
||||||
pfree(tableFooterBuffer->data);
|
|
||||||
pfree(tableFooterBuffer);
|
|
||||||
pfree(postscriptBuffer->data);
|
|
||||||
pfree(postscriptBuffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
|
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
|
||||||
* column count.
|
* column count.
|
||||||
|
@ -501,7 +408,7 @@ FlushStripe(TableWriteState *writeState)
|
||||||
StripeFooter *stripeFooter = NULL;
|
StripeFooter *stripeFooter = NULL;
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
uint32 blockIndex = 0;
|
uint32 blockIndex = 0;
|
||||||
TableFooter *tableFooter = writeState->tableFooter;
|
TableMetadata *tableMetadata = writeState->tableMetadata;
|
||||||
FILE *tableFile = writeState->tableFile;
|
FILE *tableFile = writeState->tableFile;
|
||||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||||
|
@ -509,7 +416,7 @@ FlushStripe(TableWriteState *writeState)
|
||||||
TupleDesc tupleDescriptor = writeState->tupleDescriptor;
|
TupleDesc tupleDescriptor = writeState->tupleDescriptor;
|
||||||
uint32 columnCount = tupleDescriptor->natts;
|
uint32 columnCount = tupleDescriptor->natts;
|
||||||
uint32 blockCount = stripeSkipList->blockCount;
|
uint32 blockCount = stripeSkipList->blockCount;
|
||||||
uint32 blockRowCount = tableFooter->blockRowCount;
|
uint32 blockRowCount = tableMetadata->blockRowCount;
|
||||||
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
|
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||||
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
|
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
|
||||||
|
|
||||||
|
@ -918,12 +825,12 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength)
|
||||||
* table footer's stripeMetadataList.
|
* table footer's stripeMetadataList.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AppendStripeMetadata(TableFooter *tableFooter, StripeMetadata stripeMetadata)
|
AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
|
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
|
||||||
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
|
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
|
||||||
|
|
||||||
tableFooter->stripeMetadataList = lappend(tableFooter->stripeMetadataList,
|
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
||||||
stripeMetadataCopy);
|
stripeMetadataCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ SELECT count(*) FROM cstore_truncate_test_compressed;
|
||||||
SELECT cstore_table_size('cstore_truncate_test_compressed');
|
SELECT cstore_table_size('cstore_truncate_test_compressed');
|
||||||
cstore_table_size
|
cstore_table_size
|
||||||
-------------------
|
-------------------
|
||||||
26
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- make sure data files still present
|
-- make sure data files still present
|
||||||
|
@ -82,7 +82,7 @@ SELECT count(*) FROM (
|
||||||
) AS q1) AS q2;
|
) AS q1) AS q2;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
6
|
3
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
|
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
|
||||||
|
|
Loading…
Reference in New Issue