diff --git a/Makefile b/Makefile index 72daebc55..bd3ae77ce 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ MODULE_big = cstore_fdw PG_CPPFLAGS = --std=c99 SHLIB_LINK = -lprotobuf-c -OBJS = cstore.pb-c.o cstore_fdw.o cstore_writer.o cstore_reader.o \ - cstore_metadata_serialization.o cstore_compression.o +OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \ + cstore_metadata_serialization.o cstore_compression.o mod.o EXTENSION = cstore_fdw DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \ diff --git a/cstore.c b/cstore.c new file mode 100644 index 000000000..ccb59675f --- /dev/null +++ b/cstore.c @@ -0,0 +1,170 @@ +/*------------------------------------------------------------------------- + * + * cstore.c + * + * This file contains... + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "cstore.h" + +#include + +static void CreateDirectory(StringInfo directoryName); +static bool DirectoryExists(StringInfo directoryName); + +/* ParseCompressionType converts a string to a compression type. */ +CompressionType +ParseCompressionType(const char *compressionTypeString) +{ + CompressionType compressionType = COMPRESSION_TYPE_INVALID; + Assert(compressionTypeString != NULL); + + if (strncmp(compressionTypeString, COMPRESSION_STRING_NONE, NAMEDATALEN) == 0) + { + compressionType = COMPRESSION_NONE; + } + else if (strncmp(compressionTypeString, COMPRESSION_STRING_PG_LZ, NAMEDATALEN) == 0) + { + compressionType = COMPRESSION_PG_LZ; + } + + return compressionType; +} + +/* CreateDirectory creates a new directory with the given directory name. */ +static void +CreateDirectory(StringInfo directoryName) +{ + int makeOK = mkdir(directoryName->data, S_IRWXU); + if (makeOK != 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + directoryName->data))); + } +} + +/* DirectoryExists checks if a directory exists for the given directory name. */ +static bool +DirectoryExists(StringInfo directoryName) +{ + bool directoryExists = true; + struct stat directoryStat; + + int statOK = stat(directoryName->data, &directoryStat); + if (statOK == 0) + { + /* file already exists; check that it is a directory */ + if (!S_ISDIR(directoryStat.st_mode)) + { + ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data), + errhint("You need to remove or rename the file \"%s\".", + directoryName->data))); + } + } + else + { + if (errno == ENOENT) + { + directoryExists = false; + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not stat directory \"%s\": %m", + directoryName->data))); + } + } + + return directoryExists; +} + +/* + * RemoveCStoreDatabaseDirectory removes CStore directory previously + * created for this database. + * However it does not remove 'cstore_fdw' directory even if there + * are no other databases left. + */ +void +RemoveCStoreDatabaseDirectory(Oid databaseOid) +{ + StringInfo cstoreDirectoryPath = makeStringInfo(); + StringInfo cstoreDatabaseDirectoryPath = makeStringInfo(); + + appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME); + + appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir, + CSTORE_FDW_NAME, databaseOid); + + if (DirectoryExists(cstoreDatabaseDirectoryPath)) + { + rmtree(cstoreDatabaseDirectoryPath->data, true); + } +} + + +/* + * InitializeCStoreTableFile creates data and footer file for a cstore table. + * The function assumes data and footer files do not exist, therefore + * it should be called on empty or non-existing table. Notice that the caller + * is expected to acquire AccessExclusiveLock on the relation. + */ +void +InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *cstoreOptions) +{ + TableWriteState *writeState = NULL; + TupleDesc tupleDescriptor = RelationGetDescr(relation); + + /* + * Initialize state to write to the cstore file. This creates an + * empty data file and a valid footer file for the table. + */ + writeState = CStoreBeginWrite(cstoreOptions->filename, + cstoreOptions->compressionType, cstoreOptions->stripeRowCount, + cstoreOptions->blockRowCount, tupleDescriptor); + CStoreEndWrite(writeState); +} + + +/* + * CreateCStoreDatabaseDirectory creates the directory (and parent directories, + * if needed) used to store automatically managed cstore_fdw files. The path to + * the directory is $PGDATA/cstore_fdw/{databaseOid}. + */ +void +CreateCStoreDatabaseDirectory(Oid databaseOid) +{ + bool cstoreDirectoryExists = false; + bool databaseDirectoryExists = false; + StringInfo cstoreDatabaseDirectoryPath = NULL; + + StringInfo cstoreDirectoryPath = makeStringInfo(); + appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME); + + cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath); + if (!cstoreDirectoryExists) + { + CreateDirectory(cstoreDirectoryPath); + } + + cstoreDatabaseDirectoryPath = makeStringInfo(); + appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir, + CSTORE_FDW_NAME, databaseOid); + + databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath); + if (!databaseDirectoryExists) + { + CreateDirectory(cstoreDatabaseDirectoryPath); + } +} + diff --git a/cstore.h b/cstore.h new file mode 100644 index 000000000..f51a972e2 --- /dev/null +++ b/cstore.h @@ -0,0 +1,311 @@ +/*------------------------------------------------------------------------- + * + * cstore.h + * + * Type and function declarations for CStore + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#ifndef CSTORE_H +#define CSTORE_H + +#include "access/tupdesc.h" +#include "fmgr.h" +#include "catalog/pg_am.h" +#include "catalog/pg_foreign_server.h" +#include "catalog/pg_foreign_table.h" +#include "lib/stringinfo.h" +#include "utils/rel.h" + +/* Defines for valid option names */ +#define OPTION_NAME_FILENAME "filename" +#define OPTION_NAME_COMPRESSION_TYPE "compression" +#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count" +#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count" + +/* Default values for option parameters */ +#define DEFAULT_COMPRESSION_TYPE COMPRESSION_NONE +#define DEFAULT_STRIPE_ROW_COUNT 150000 +#define DEFAULT_BLOCK_ROW_COUNT 10000 + +/* Limits for option parameters */ +#define STRIPE_ROW_COUNT_MINIMUM 1000 +#define STRIPE_ROW_COUNT_MAXIMUM 10000000 +#define BLOCK_ROW_COUNT_MINIMUM 1000 +#define BLOCK_ROW_COUNT_MAXIMUM 100000 + +/* String representations of compression types */ +#define COMPRESSION_STRING_NONE "none" +#define COMPRESSION_STRING_PG_LZ "pglz" + +/* CStore file signature */ +#define CSTORE_MAGIC_NUMBER "citus_cstore" +#define CSTORE_VERSION_MAJOR 1 +#define CSTORE_VERSION_MINOR 7 + +/* miscellaneous defines */ +#define CSTORE_FDW_NAME "cstore_fdw" +#define CSTORE_FOOTER_FILE_SUFFIX ".footer" +#define CSTORE_TEMP_FILE_SUFFIX ".tmp" +#define CSTORE_TUPLE_COST_MULTIPLIER 10 +#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1 +#define CSTORE_POSTSCRIPT_SIZE_MAX 256 + +/* Enumaration for cstore file's compression method */ +typedef enum +{ + COMPRESSION_TYPE_INVALID = -1, + COMPRESSION_NONE = 0, + COMPRESSION_PG_LZ = 1, + + COMPRESSION_COUNT + +} CompressionType; + + +/* + * CStoreFdwOptions holds the option values to be used when reading or writing + * a cstore file. To resolve these values, we first check foreign table's options, + * and if not present, we then fall back to the default values specified above. + */ +typedef struct CStoreOptions +{ + char *filename; + CompressionType compressionType; + uint64 stripeRowCount; + uint32 blockRowCount; + +} CStoreOptions; + + +/* + * StripeMetadata represents information about a stripe. This information is + * stored in the cstore file's footer. + */ +typedef struct StripeMetadata +{ + uint64 fileOffset; + uint64 skipListLength; + uint64 dataLength; + uint64 footerLength; + +} StripeMetadata; + + +/* TableFooter represents the footer of a cstore file. */ +typedef struct TableFooter +{ + List *stripeMetadataList; + uint64 blockRowCount; + +} TableFooter; + + +/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */ +typedef struct ColumnBlockSkipNode +{ + /* statistics about values of a column block */ + bool hasMinMax; + Datum minimumValue; + Datum maximumValue; + uint64 rowCount; + + /* + * Offsets and sizes of value and exists streams in the column data. + * These enable us to skip reading suppressed row blocks, and start reading + * a block without reading previous blocks. + */ + uint64 valueBlockOffset; + uint64 valueLength; + uint64 existsBlockOffset; + uint64 existsLength; + + CompressionType valueCompressionType; + +} ColumnBlockSkipNode; + + +/* + * StripeSkipList can be used for skipping row blocks. It contains a column block + * skip node for each block of each column. blockSkipNodeArray[column][block] + * is the entry for the specified column block. + */ +typedef struct StripeSkipList +{ + ColumnBlockSkipNode **blockSkipNodeArray; + uint32 columnCount; + uint32 blockCount; + +} StripeSkipList; + + +/* + * ColumnBlockData represents a block of data in a column. valueArray stores + * the values of data, and existsArray stores whether a value is present. + * valueBuffer is used to store (uncompressed) serialized values + * referenced by Datum's in valueArray. It is only used for by-reference Datum's. + * There is a one-to-one correspondence between valueArray and existsArray. + */ +typedef struct ColumnBlockData +{ + bool *existsArray; + Datum *valueArray; + + /* valueBuffer keeps actual data for type-by-reference datums from valueArray. */ + StringInfo valueBuffer; + +} ColumnBlockData; + + +/* + * ColumnBlockBuffers represents a block of serialized data in a column. + * valueBuffer stores the serialized values of data, and existsBuffer stores + * serialized value of presence information. valueCompressionType contains + * compression type if valueBuffer is compressed. Finally rowCount has + * the number of rows in this block. + */ +typedef struct ColumnBlockBuffers +{ + StringInfo existsBuffer; + StringInfo valueBuffer; + CompressionType valueCompressionType; + +} ColumnBlockBuffers; + + +/* + * ColumnBuffers represents data buffers for a column in a row stripe. Each + * column is made of multiple column blocks. + */ +typedef struct ColumnBuffers +{ + ColumnBlockBuffers **blockBuffersArray; + +} ColumnBuffers; + + +/* StripeBuffers represents data for a row stripe in a cstore file. */ +typedef struct StripeBuffers +{ + uint32 columnCount; + uint32 rowCount; + ColumnBuffers **columnBuffersArray; + +} StripeBuffers; + + +/* + * StripeFooter represents a stripe's footer. In this footer, we keep three + * arrays of sizes. The number of elements in each of the arrays is equal + * to the number of columns. + */ +typedef struct StripeFooter +{ + uint32 columnCount; + uint64 *skipListSizeArray; + uint64 *existsSizeArray; + uint64 *valueSizeArray; + +} StripeFooter; + + +/* TableReadState represents state of a cstore file read operation. */ +typedef struct TableReadState +{ + FILE *tableFile; + TableFooter *tableFooter; + TupleDesc tupleDescriptor; + + /* + * List of Var pointers for columns in the query. We use this both for + * getting vector of projected columns, and also when we want to build + * base constraint to find selected row blocks. + */ + List *projectedColumnList; + + List *whereClauseList; + MemoryContext stripeReadContext; + StripeBuffers *stripeBuffers; + uint32 readStripeCount; + uint64 stripeReadRowCount; + ColumnBlockData **blockDataArray; + int32 deserializedBlockIndex; + +} TableReadState; + + +/* TableWriteState represents state of a cstore file write operation. */ +typedef struct TableWriteState +{ + FILE *tableFile; + TableFooter *tableFooter; + StringInfo tableFooterFilename; + CompressionType compressionType; + TupleDesc tupleDescriptor; + FmgrInfo **comparisonFunctionArray; + uint64 currentFileOffset; + Relation relation; + + MemoryContext stripeWriteContext; + StripeBuffers *stripeBuffers; + StripeSkipList *stripeSkipList; + uint32 stripeMaxRowCount; + ColumnBlockData **blockDataArray; + /* + * compressionBuffer buffer is used as temporary storage during + * data value compression operation. It is kept here to minimize + * memory allocations. It lives in stripeWriteContext and gets + * deallocated when memory context is reset. + */ + StringInfo compressionBuffer; + +} TableWriteState; + +/* Function declarations for extension loading and unloading */ +extern void _PG_init(void); +extern void _PG_fini(void); + +extern CompressionType ParseCompressionType(const char *compressionTypeString); +extern void InitializeCStoreTableFile(Oid relationId, Relation relation, + CStoreOptions *cstoreOptions); +extern void CreateCStoreDatabaseDirectory(Oid databaseOid); +extern void RemoveCStoreDatabaseDirectory(Oid databaseOid); + +/* Function declarations for writing to a cstore file */ +extern TableWriteState * CStoreBeginWrite(const char *filename, + CompressionType compressionType, + uint64 stripeMaxRowCount, + uint32 blockRowCount, + TupleDesc tupleDescriptor); +extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues, + bool *columnNulls); +extern void CStoreEndWrite(TableWriteState * state); + +/* Function declarations for reading from a cstore file */ +extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, + List *projectedColumnList, List *qualConditions); +extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename); +extern bool CStoreReadFinished(TableReadState *state); +extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues, + bool *columnNulls); +extern void CStoreEndRead(TableReadState *state); + +/* Function declarations for common functions */ +extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, + int16 procedureId); +extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, + uint32 blockRowCount); +extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, + uint32 columnCount); +extern uint64 CStoreTableRowCount(const char *filename); +extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, + CompressionType compressionType); +extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); + + +#endif /* CSTORE_H */ diff --git a/cstore_fdw.c b/cstore_fdw.c index b0a327768..c80d53f2c 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -95,24 +95,18 @@ static List * FindCStoreTables(List *tableList); static List * OpenRelationsForTruncate(List *cstoreTableList); static void TruncateCStoreTables(List *cstoreRelationList); static void DeleteCStoreTableFiles(char *filename); -static void InitializeCStoreTableFile(Oid relationId, Relation relation); static bool CStoreTable(Oid relationId); static bool CStoreServer(ForeignServer *server); static bool DistributedTable(Oid relationId); static bool DistributedWorkerCopy(CopyStmt *copyStatement); -static void CreateCStoreDatabaseDirectory(Oid databaseOid); -static bool DirectoryExists(StringInfo directoryName); -static void CreateDirectory(StringInfo directoryName); -static void RemoveCStoreDatabaseDirectory(Oid databaseOid); static StringInfo OptionNamesString(Oid currentContextId); static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts); -static CStoreFdwOptions * CStoreGetOptions(Oid foreignTableId); +static CStoreOptions * CStoreGetOptions(Oid foreignTableId); static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName); static void ValidateForeignTableOptions(char *filename, char *compressionTypeString, char *stripeRowCountString, char *blockRowCountString); static char * CStoreDefaultFilePath(Oid foreignTableId); -static CompressionType ParseCompressionType(const char *compressionTypeString); static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId); static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, @@ -160,9 +154,6 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); #endif -/* declarations for dynamic loading */ -PG_MODULE_MAGIC; - PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger); PG_FUNCTION_INFO_V1(cstore_table_size); PG_FUNCTION_INFO_V1(cstore_fdw_handler); @@ -175,11 +166,11 @@ static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL; /* - * _PG_init is called when the module is loaded. In this function we save the + * Called when the module is loaded. In this function we save the * previous utility hook, and then install our hook to pre-intercept calls to * the copy command. */ -void _PG_init(void) +void cstore_fdw_init() { PreviousProcessUtilityHook = ProcessUtility_hook; ProcessUtility_hook = CStoreProcessUtility; @@ -187,10 +178,10 @@ void _PG_init(void) /* - * _PG_fini is called when the module is unloaded. This function uninstalls the + * Called when the module is unloaded. This function uninstalls the * extension's hooks. */ -void _PG_fini(void) +void cstore_fdw_finish() { ProcessUtility_hook = PreviousProcessUtilityHook; } @@ -249,7 +240,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS) */ CreateCStoreDatabaseDirectory(MyDatabaseId); - InitializeCStoreTableFile(relationId, relation); + InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); heap_close(relation, AccessExclusiveLock); } } @@ -525,7 +516,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) Datum *columnValues = NULL; bool *columnNulls = NULL; TableWriteState *writeState = NULL; - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; MemoryContext tupleContext = NULL; /* Only superuser can copy from or to local file */ @@ -546,7 +537,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - cstoreFdwOptions = CStoreGetOptions(relationId); + cstoreOptions = CStoreGetOptions(relationId); /* * We create a new memory context called tuple context, and read and write @@ -580,10 +571,10 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) #endif /* init state to write to the cstore file */ - writeState = CStoreBeginWrite(cstoreFdwOptions->filename, - cstoreFdwOptions->compressionType, - cstoreFdwOptions->stripeRowCount, - cstoreFdwOptions->blockRowCount, + writeState = CStoreBeginWrite(cstoreOptions->filename, + cstoreOptions->compressionType, + cstoreOptions->stripeRowCount, + cstoreOptions->blockRowCount, tupleDescriptor); while (nextRowFound) @@ -765,7 +756,7 @@ DroppedCStoreFilenameList(DropStmt *dropStatement) Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true); if (CStoreTable(relationId)) { - CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(relationId); + CStoreOptions *cstoreOptions = CStoreGetOptions(relationId); char *defaultfilename = CStoreDefaultFilePath(relationId); /* @@ -773,13 +764,13 @@ DroppedCStoreFilenameList(DropStmt *dropStatement) * by sql drop trigger. Both paths are generated by code, use * of strcmp is safe here. */ - if (strcmp(defaultfilename, cstoreFdwOptions->filename) == 0) + if (strcmp(defaultfilename, cstoreOptions->filename) == 0) { continue; } droppedCStoreFileList = lappend(droppedCStoreFileList, - cstoreFdwOptions->filename); + cstoreOptions->filename); } } } @@ -857,13 +848,13 @@ TruncateCStoreTables(List *cstoreRelationList) { Relation relation = (Relation) lfirst(relationCell); Oid relationId = relation->rd_id; - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; Assert(CStoreTable(relationId)); - cstoreFdwOptions = CStoreGetOptions(relationId); - DeleteCStoreTableFiles(cstoreFdwOptions->filename); - InitializeCStoreTableFile(relationId, relation); + cstoreOptions = CStoreGetOptions(relationId); + DeleteCStoreTableFiles(cstoreOptions->filename); + InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); } } @@ -901,29 +892,6 @@ DeleteCStoreTableFiles(char *filename) } -/* - * InitializeCStoreTableFile creates data and footer file for a cstore table. - * The function assumes data and footer files do not exist, therefore - * it should be called on empty or non-existing table. Notice that the caller - * is expected to acquire AccessExclusiveLock on the relation. - */ -static void InitializeCStoreTableFile(Oid relationId, Relation relation) -{ - TableWriteState *writeState = NULL; - TupleDesc tupleDescriptor = RelationGetDescr(relation); - CStoreFdwOptions* cstoreFdwOptions = CStoreGetOptions(relationId); - - /* - * Initialize state to write to the cstore file. This creates an - * empty data file and a valid footer file for the table. - */ - writeState = CStoreBeginWrite(cstoreFdwOptions->filename, - cstoreFdwOptions->compressionType, cstoreFdwOptions->stripeRowCount, - cstoreFdwOptions->blockRowCount, tupleDescriptor); - CStoreEndWrite(writeState); -} - - /* * CStoreTable checks if the given table name belongs to a foreign columnar store @@ -1045,111 +1013,7 @@ DistributedWorkerCopy(CopyStmt *copyStatement) } -/* - * CreateCStoreDatabaseDirectory creates the directory (and parent directories, - * if needed) used to store automatically managed cstore_fdw files. The path to - * the directory is $PGDATA/cstore_fdw/{databaseOid}. - */ -static void -CreateCStoreDatabaseDirectory(Oid databaseOid) -{ - bool cstoreDirectoryExists = false; - bool databaseDirectoryExists = false; - StringInfo cstoreDatabaseDirectoryPath = NULL; - StringInfo cstoreDirectoryPath = makeStringInfo(); - appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME); - - cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath); - if (!cstoreDirectoryExists) - { - CreateDirectory(cstoreDirectoryPath); - } - - cstoreDatabaseDirectoryPath = makeStringInfo(); - appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir, - CSTORE_FDW_NAME, databaseOid); - - databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath); - if (!databaseDirectoryExists) - { - CreateDirectory(cstoreDatabaseDirectoryPath); - } -} - - -/* DirectoryExists checks if a directory exists for the given directory name. */ -static bool -DirectoryExists(StringInfo directoryName) -{ - bool directoryExists = true; - struct stat directoryStat; - - int statOK = stat(directoryName->data, &directoryStat); - if (statOK == 0) - { - /* file already exists; check that it is a directory */ - if (!S_ISDIR(directoryStat.st_mode)) - { - ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data), - errhint("You need to remove or rename the file \"%s\".", - directoryName->data))); - } - } - else - { - if (errno == ENOENT) - { - directoryExists = false; - } - else - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not stat directory \"%s\": %m", - directoryName->data))); - } - } - - return directoryExists; -} - - -/* CreateDirectory creates a new directory with the given directory name. */ -static void -CreateDirectory(StringInfo directoryName) -{ - int makeOK = mkdir(directoryName->data, S_IRWXU); - if (makeOK != 0) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not create directory \"%s\": %m", - directoryName->data))); - } -} - - -/* - * RemoveCStoreDatabaseDirectory removes CStore directory previously - * created for this database. - * However it does not remove 'cstore_fdw' directory even if there - * are no other databases left. - */ -static void -RemoveCStoreDatabaseDirectory(Oid databaseOid) -{ - StringInfo cstoreDirectoryPath = makeStringInfo(); - StringInfo cstoreDatabaseDirectoryPath = makeStringInfo(); - - appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME); - - appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir, - CSTORE_FDW_NAME, databaseOid); - - if (DirectoryExists(cstoreDatabaseDirectoryPath)) - { - rmtree(cstoreDatabaseDirectoryPath->data, true); - } -} /* @@ -1162,7 +1026,7 @@ cstore_table_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); int64 tableSize = 0; - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; char *dataFilename = NULL; StringInfo footerFilename = NULL; int dataFileStatResult = 0; @@ -1176,8 +1040,8 @@ cstore_table_size(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("relation is not a cstore table"))); } - cstoreFdwOptions = CStoreGetOptions(relationId); - dataFilename = cstoreFdwOptions->filename; + cstoreOptions = CStoreGetOptions(relationId); + dataFilename = cstoreOptions->filename; dataFileStatResult = stat(dataFilename, &dataFileStatBuffer); if (dataFileStatResult != 0) @@ -1402,10 +1266,10 @@ GetSlotHeapTuple(TupleTableSlot *tts) * foreign table, and if not present, falls back to default values. This function * errors out if given option values are considered invalid. */ -static CStoreFdwOptions * +static CStoreOptions * CStoreGetOptions(Oid foreignTableId) { - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; char *filename = NULL; CompressionType compressionType = DEFAULT_COMPRESSION_TYPE; int32 stripeRowCount = DEFAULT_STRIPE_ROW_COUNT; @@ -1445,13 +1309,13 @@ CStoreGetOptions(Oid foreignTableId) filename = CStoreDefaultFilePath(foreignTableId); } - cstoreFdwOptions = palloc0(sizeof(CStoreFdwOptions)); - cstoreFdwOptions->filename = filename; - cstoreFdwOptions->compressionType = compressionType; - cstoreFdwOptions->stripeRowCount = stripeRowCount; - cstoreFdwOptions->blockRowCount = blockRowCount; + cstoreOptions = palloc0(sizeof(CStoreOptions)); + cstoreOptions->filename = filename; + cstoreOptions->compressionType = compressionType; + cstoreOptions->stripeRowCount = stripeRowCount; + cstoreOptions->blockRowCount = blockRowCount; - return cstoreFdwOptions; + return cstoreOptions; } @@ -1577,26 +1441,6 @@ CStoreDefaultFilePath(Oid foreignTableId) } -/* ParseCompressionType converts a string to a compression type. */ -static CompressionType -ParseCompressionType(const char *compressionTypeString) -{ - CompressionType compressionType = COMPRESSION_TYPE_INVALID; - Assert(compressionTypeString != NULL); - - if (strncmp(compressionTypeString, COMPRESSION_STRING_NONE, NAMEDATALEN) == 0) - { - compressionType = COMPRESSION_NONE; - } - else if (strncmp(compressionTypeString, COMPRESSION_STRING_PG_LZ, NAMEDATALEN) == 0) - { - compressionType = COMPRESSION_PG_LZ; - } - - return compressionType; -} - - /* * CStoreGetForeignRelSize obtains relation size estimates for a foreign table and * puts its estimate for row count into baserel->rows. @@ -1604,8 +1448,8 @@ ParseCompressionType(const char *compressionTypeString) static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) { - CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId); - double tupleCountEstimate = TupleCountEstimate(baserel, cstoreFdwOptions->filename); + CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); + double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo, 0, JOIN_INNER, NULL); @@ -1624,7 +1468,7 @@ static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) { Path *foreignScanPath = NULL; - CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId); + CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); Relation relation = heap_open(foreignTableId, AccessShareLock); /* @@ -1645,14 +1489,14 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId */ List *queryColumnList = ColumnList(baserel, foreignTableId); uint32 queryColumnCount = list_length(queryColumnList); - BlockNumber relationPageCount = PageCount(cstoreFdwOptions->filename); + BlockNumber relationPageCount = PageCount(cstoreOptions->filename); uint32 relationColumnCount = RelationGetNumberOfAttributes(relation); double queryColumnRatio = (double) queryColumnCount / relationColumnCount; double queryPageCount = relationPageCount * queryColumnRatio; double totalDiskAccessCost = seq_page_cost * queryPageCount; - double tupleCountEstimate = TupleCountEstimate(baserel, cstoreFdwOptions->filename); + double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename); /* * We estimate costs almost the same way as cost_seqscan(), thus assuming @@ -1922,16 +1766,16 @@ static void CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState) { Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId); + CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); - ExplainPropertyText("CStore File", cstoreFdwOptions->filename, explainState); + ExplainPropertyText("CStore File", cstoreOptions->filename, explainState); /* supress file size if we're not showing cost details */ if (explainState->costs) { struct stat statBuffer; - int statResult = stat(cstoreFdwOptions->filename, &statBuffer); + int statResult = stat(cstoreOptions->filename, &statBuffer); if (statResult == 0) { ExplainPropertyLong("CStore File Size", (long) statBuffer.st_size, @@ -1947,7 +1791,7 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) { TableReadState *readState = NULL; Oid foreignTableId = InvalidOid; - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; Relation currentRelation = scanState->ss.ss_currentRelation; TupleDesc tupleDescriptor = RelationGetDescr(currentRelation); List *columnList = NIL; @@ -1962,14 +1806,14 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) } foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - cstoreFdwOptions = CStoreGetOptions(foreignTableId); + cstoreOptions = CStoreGetOptions(foreignTableId); foreignScan = (ForeignScan *) scanState->ss.ps.plan; foreignPrivateList = (List *) foreignScan->fdw_private; whereClauseList = foreignScan->scan.plan.qual; columnList = (List *) linitial(foreignPrivateList); - readState = CStoreBeginRead(cstoreFdwOptions->filename, tupleDescriptor, + readState = CStoreBeginRead(cstoreOptions->filename, tupleDescriptor, columnList, whereClauseList); scanState->fdw_state = (void *) readState; @@ -2040,18 +1884,18 @@ CStoreAnalyzeForeignTable(Relation relation, BlockNumber *totalPageCount) { Oid foreignTableId = RelationGetRelid(relation); - CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId); + CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); struct stat statBuffer; - int statResult = stat(cstoreFdwOptions->filename, &statBuffer); + int statResult = stat(cstoreOptions->filename, &statBuffer); if (statResult < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", - cstoreFdwOptions->filename))); + cstoreOptions->filename))); } - (*totalPageCount) = PageCount(cstoreFdwOptions->filename); + (*totalPageCount) = PageCount(cstoreOptions->filename); (*acquireSampleRowsFunc) = CStoreAcquireSampleRows; return true; @@ -2311,20 +2155,20 @@ static void CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *relationInfo) { Oid foreignTableOid = InvalidOid; - CStoreFdwOptions *cstoreFdwOptions = NULL; + CStoreOptions *cstoreOptions = NULL; TupleDesc tupleDescriptor = NULL; TableWriteState *writeState = NULL; Relation relation = NULL; foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc); relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock); - cstoreFdwOptions = CStoreGetOptions(foreignTableOid); + cstoreOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); - writeState = CStoreBeginWrite(cstoreFdwOptions->filename, - cstoreFdwOptions->compressionType, - cstoreFdwOptions->stripeRowCount, - cstoreFdwOptions->blockRowCount, + writeState = CStoreBeginWrite(cstoreOptions->filename, + cstoreOptions->compressionType, + cstoreOptions->stripeRowCount, + cstoreOptions->blockRowCount, tupleDescriptor); writeState->relation = relation; diff --git a/cstore_fdw.h b/cstore_fdw.h index 2bc3e9c97..7b8475497 100644 --- a/cstore_fdw.h +++ b/cstore_fdw.h @@ -22,41 +22,7 @@ #include "lib/stringinfo.h" #include "utils/rel.h" - -/* Defines for valid option names */ -#define OPTION_NAME_FILENAME "filename" -#define OPTION_NAME_COMPRESSION_TYPE "compression" -#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count" -#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count" - -/* Default values for option parameters */ -#define DEFAULT_COMPRESSION_TYPE COMPRESSION_NONE -#define DEFAULT_STRIPE_ROW_COUNT 150000 -#define DEFAULT_BLOCK_ROW_COUNT 10000 - -/* Limits for option parameters */ -#define STRIPE_ROW_COUNT_MINIMUM 1000 -#define STRIPE_ROW_COUNT_MAXIMUM 10000000 -#define BLOCK_ROW_COUNT_MINIMUM 1000 -#define BLOCK_ROW_COUNT_MAXIMUM 100000 - -/* String representations of compression types */ -#define COMPRESSION_STRING_NONE "none" -#define COMPRESSION_STRING_PG_LZ "pglz" -#define COMPRESSION_STRING_DELIMITED_LIST "none, pglz" - -/* CStore file signature */ -#define CSTORE_MAGIC_NUMBER "citus_cstore" -#define CSTORE_VERSION_MAJOR 1 -#define CSTORE_VERSION_MINOR 7 - -/* miscellaneous defines */ -#define CSTORE_FDW_NAME "cstore_fdw" -#define CSTORE_FOOTER_FILE_SUFFIX ".footer" -#define CSTORE_TEMP_FILE_SUFFIX ".tmp" -#define CSTORE_TUPLE_COST_MULTIPLIER 10 -#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1 -#define CSTORE_POSTSCRIPT_SIZE_MAX 256 +#include "cstore.h" /* table containing information about how to partition distributed tables */ #define CITUS_EXTENSION_NAME "citus" @@ -67,7 +33,6 @@ #define ATTR_NUM_PARTITION_TYPE 2 #define ATTR_NUM_PARTITION_KEY 3 - /* * CStoreValidOption keeps an option name and a context. When an option is passed * into cstore_fdw objects (server and foreign table), we compare this option's @@ -80,6 +45,7 @@ typedef struct CStoreValidOption } CStoreValidOption; +#define COMPRESSION_STRING_DELIMITED_LIST "none, pglz" /* Array of options that are valid for cstore_fdw */ static const uint32 ValidOptionCount = 4; @@ -92,220 +58,8 @@ static const CStoreValidOption ValidOptionArray[] = { OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId } }; - -/* Enumaration for cstore file's compression method */ -typedef enum -{ - COMPRESSION_TYPE_INVALID = -1, - COMPRESSION_NONE = 0, - COMPRESSION_PG_LZ = 1, - - COMPRESSION_COUNT - -} CompressionType; - - -/* - * CStoreFdwOptions holds the option values to be used when reading or writing - * a cstore file. To resolve these values, we first check foreign table's options, - * and if not present, we then fall back to the default values specified above. - */ -typedef struct CStoreFdwOptions -{ - char *filename; - CompressionType compressionType; - uint64 stripeRowCount; - uint32 blockRowCount; - -} CStoreFdwOptions; - - -/* - * StripeMetadata represents information about a stripe. This information is - * stored in the cstore file's footer. - */ -typedef struct StripeMetadata -{ - uint64 fileOffset; - uint64 skipListLength; - uint64 dataLength; - uint64 footerLength; - -} StripeMetadata; - - -/* TableFooter represents the footer of a cstore file. */ -typedef struct TableFooter -{ - List *stripeMetadataList; - uint64 blockRowCount; - -} TableFooter; - - -/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */ -typedef struct ColumnBlockSkipNode -{ - /* statistics about values of a column block */ - bool hasMinMax; - Datum minimumValue; - Datum maximumValue; - uint64 rowCount; - - /* - * Offsets and sizes of value and exists streams in the column data. - * These enable us to skip reading suppressed row blocks, and start reading - * a block without reading previous blocks. - */ - uint64 valueBlockOffset; - uint64 valueLength; - uint64 existsBlockOffset; - uint64 existsLength; - - CompressionType valueCompressionType; - -} ColumnBlockSkipNode; - - -/* - * StripeSkipList can be used for skipping row blocks. It contains a column block - * skip node for each block of each column. blockSkipNodeArray[column][block] - * is the entry for the specified column block. - */ -typedef struct StripeSkipList -{ - ColumnBlockSkipNode **blockSkipNodeArray; - uint32 columnCount; - uint32 blockCount; - -} StripeSkipList; - - -/* - * ColumnBlockData represents a block of data in a column. valueArray stores - * the values of data, and existsArray stores whether a value is present. - * valueBuffer is used to store (uncompressed) serialized values - * referenced by Datum's in valueArray. It is only used for by-reference Datum's. - * There is a one-to-one correspondence between valueArray and existsArray. - */ -typedef struct ColumnBlockData -{ - bool *existsArray; - Datum *valueArray; - - /* valueBuffer keeps actual data for type-by-reference datums from valueArray. */ - StringInfo valueBuffer; - -} ColumnBlockData; - - -/* - * ColumnBlockBuffers represents a block of serialized data in a column. - * valueBuffer stores the serialized values of data, and existsBuffer stores - * serialized value of presence information. valueCompressionType contains - * compression type if valueBuffer is compressed. Finally rowCount has - * the number of rows in this block. - */ -typedef struct ColumnBlockBuffers -{ - StringInfo existsBuffer; - StringInfo valueBuffer; - CompressionType valueCompressionType; - -} ColumnBlockBuffers; - - -/* - * ColumnBuffers represents data buffers for a column in a row stripe. Each - * column is made of multiple column blocks. - */ -typedef struct ColumnBuffers -{ - ColumnBlockBuffers **blockBuffersArray; - -} ColumnBuffers; - - -/* StripeBuffers represents data for a row stripe in a cstore file. */ -typedef struct StripeBuffers -{ - uint32 columnCount; - uint32 rowCount; - ColumnBuffers **columnBuffersArray; - -} StripeBuffers; - - -/* - * StripeFooter represents a stripe's footer. In this footer, we keep three - * arrays of sizes. The number of elements in each of the arrays is equal - * to the number of columns. - */ -typedef struct StripeFooter -{ - uint32 columnCount; - uint64 *skipListSizeArray; - uint64 *existsSizeArray; - uint64 *valueSizeArray; - -} StripeFooter; - - -/* TableReadState represents state of a cstore file read operation. */ -typedef struct TableReadState -{ - FILE *tableFile; - TableFooter *tableFooter; - TupleDesc tupleDescriptor; - - /* - * List of Var pointers for columns in the query. We use this both for - * getting vector of projected columns, and also when we want to build - * base constraint to find selected row blocks. - */ - List *projectedColumnList; - - List *whereClauseList; - MemoryContext stripeReadContext; - StripeBuffers *stripeBuffers; - uint32 readStripeCount; - uint64 stripeReadRowCount; - ColumnBlockData **blockDataArray; - int32 deserializedBlockIndex; - -} TableReadState; - - -/* TableWriteState represents state of a cstore file write operation. */ -typedef struct TableWriteState -{ - FILE *tableFile; - TableFooter *tableFooter; - StringInfo tableFooterFilename; - CompressionType compressionType; - TupleDesc tupleDescriptor; - FmgrInfo **comparisonFunctionArray; - uint64 currentFileOffset; - Relation relation; - - MemoryContext stripeWriteContext; - StripeBuffers *stripeBuffers; - StripeSkipList *stripeSkipList; - uint32 stripeMaxRowCount; - ColumnBlockData **blockDataArray; - /* - * compressionBuffer buffer is used as temporary storage during - * data value compression operation. It is kept here to minimize - * memory allocations. It lives in stripeWriteContext and gets - * deallocated when memory context is reset. - */ - StringInfo compressionBuffer; - -} TableWriteState; - -/* Function declarations for extension loading and unloading */ -extern void _PG_init(void); -extern void _PG_fini(void); +void cstore_fdw_init(void); +void cstore_fdw_finish(void); /* event trigger function declarations */ extern Datum cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS); @@ -318,36 +72,4 @@ extern Datum cstore_clean_table_resources(PG_FUNCTION_ARGS); extern Datum cstore_fdw_handler(PG_FUNCTION_ARGS); extern Datum cstore_fdw_validator(PG_FUNCTION_ARGS); -/* Function declarations for writing to a cstore file */ -extern TableWriteState * CStoreBeginWrite(const char *filename, - CompressionType compressionType, - uint64 stripeMaxRowCount, - uint32 blockRowCount, - TupleDesc tupleDescriptor); -extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues, - bool *columnNulls); -extern void CStoreEndWrite(TableWriteState * state); - -/* Function declarations for reading from a cstore file */ -extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor, - List *projectedColumnList, List *qualConditions); -extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename); -extern bool CStoreReadFinished(TableReadState *state); -extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues, - bool *columnNulls); -extern void CStoreEndRead(TableReadState *state); - -/* Function declarations for common functions */ -extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, - int16 procedureId); -extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, - uint32 blockRowCount); -extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, - uint32 columnCount); -extern uint64 CStoreTableRowCount(const char *filename); -extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, - CompressionType compressionType); -extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); - - #endif /* CSTORE_FDW_H */ diff --git a/mod.c b/mod.c new file mode 100644 index 000000000..aa65ac0ec --- /dev/null +++ b/mod.c @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * + * mod.c + * + * This file contains module-level definitions. + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "cstore_fdw.h" + +PG_MODULE_MAGIC; + +void _PG_init(void) +{ + cstore_fdw_init(); +} + + +void _PG_fini(void) +{ + cstore_fdw_finish(); +} +