Refactor the FDW API to take code out of cstore_fdw.c.

merge-cstore-pykello
Jeff Davis 2020-08-31 11:39:08 -07:00
parent abc9fbe1c3
commit ba506acd35
6 changed files with 568 additions and 491 deletions

View File

@ -7,8 +7,8 @@ MODULE_big = cstore_fdw
PG_CPPFLAGS = --std=c99
SHLIB_LINK = -lprotobuf-c
OBJS = cstore.pb-c.o cstore_fdw.o cstore_writer.o cstore_reader.o \
cstore_metadata_serialization.o cstore_compression.o
OBJS = cstore.pb-c.o cstore.o cstore_fdw.o cstore_writer.o cstore_reader.o \
cstore_metadata_serialization.o cstore_compression.o mod.o
EXTENSION = cstore_fdw
DATA = cstore_fdw--1.7.sql cstore_fdw--1.6--1.7.sql cstore_fdw--1.5--1.6.sql cstore_fdw--1.4--1.5.sql \

170
cstore.c Normal file
View File

@ -0,0 +1,170 @@
/*-------------------------------------------------------------------------
*
* cstore.c
*
* This file contains...
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "cstore.h"
#include <sys/stat.h>
static void CreateDirectory(StringInfo directoryName);
static bool DirectoryExists(StringInfo directoryName);
/* ParseCompressionType converts a string to a compression type. */
CompressionType
ParseCompressionType(const char *compressionTypeString)
{
CompressionType compressionType = COMPRESSION_TYPE_INVALID;
Assert(compressionTypeString != NULL);
if (strncmp(compressionTypeString, COMPRESSION_STRING_NONE, NAMEDATALEN) == 0)
{
compressionType = COMPRESSION_NONE;
}
else if (strncmp(compressionTypeString, COMPRESSION_STRING_PG_LZ, NAMEDATALEN) == 0)
{
compressionType = COMPRESSION_PG_LZ;
}
return compressionType;
}
/* CreateDirectory creates a new directory with the given directory name. */
static void
CreateDirectory(StringInfo directoryName)
{
int makeOK = mkdir(directoryName->data, S_IRWXU);
if (makeOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create directory \"%s\": %m",
directoryName->data)));
}
}
/* DirectoryExists checks if a directory exists for the given directory name. */
static bool
DirectoryExists(StringInfo directoryName)
{
bool directoryExists = true;
struct stat directoryStat;
int statOK = stat(directoryName->data, &directoryStat);
if (statOK == 0)
{
/* file already exists; check that it is a directory */
if (!S_ISDIR(directoryStat.st_mode))
{
ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data),
errhint("You need to remove or rename the file \"%s\".",
directoryName->data)));
}
}
else
{
if (errno == ENOENT)
{
directoryExists = false;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat directory \"%s\": %m",
directoryName->data)));
}
}
return directoryExists;
}
/*
* RemoveCStoreDatabaseDirectory removes CStore directory previously
* created for this database.
* However it does not remove 'cstore_fdw' directory even if there
* are no other databases left.
*/
void
RemoveCStoreDatabaseDirectory(Oid databaseOid)
{
StringInfo cstoreDirectoryPath = makeStringInfo();
StringInfo cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
if (DirectoryExists(cstoreDatabaseDirectoryPath))
{
rmtree(cstoreDatabaseDirectoryPath->data, true);
}
}
/*
* InitializeCStoreTableFile creates data and footer file for a cstore table.
* The function assumes data and footer files do not exist, therefore
* it should be called on empty or non-existing table. Notice that the caller
* is expected to acquire AccessExclusiveLock on the relation.
*/
void
InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *cstoreOptions)
{
TableWriteState *writeState = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(relation);
/*
* Initialize state to write to the cstore file. This creates an
* empty data file and a valid footer file for the table.
*/
writeState = CStoreBeginWrite(cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, tupleDescriptor);
CStoreEndWrite(writeState);
}
/*
* CreateCStoreDatabaseDirectory creates the directory (and parent directories,
* if needed) used to store automatically managed cstore_fdw files. The path to
* the directory is $PGDATA/cstore_fdw/{databaseOid}.
*/
void
CreateCStoreDatabaseDirectory(Oid databaseOid)
{
bool cstoreDirectoryExists = false;
bool databaseDirectoryExists = false;
StringInfo cstoreDatabaseDirectoryPath = NULL;
StringInfo cstoreDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath);
if (!cstoreDirectoryExists)
{
CreateDirectory(cstoreDirectoryPath);
}
cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath);
if (!databaseDirectoryExists)
{
CreateDirectory(cstoreDatabaseDirectoryPath);
}
}

311
cstore.h Normal file
View File

@ -0,0 +1,311 @@
/*-------------------------------------------------------------------------
*
* cstore.h
*
* Type and function declarations for CStore
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#ifndef CSTORE_H
#define CSTORE_H
#include "access/tupdesc.h"
#include "fmgr.h"
#include "catalog/pg_am.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "lib/stringinfo.h"
#include "utils/rel.h"
/* Defines for valid option names */
#define OPTION_NAME_FILENAME "filename"
#define OPTION_NAME_COMPRESSION_TYPE "compression"
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count"
#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count"
/* Default values for option parameters */
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_NONE
#define DEFAULT_STRIPE_ROW_COUNT 150000
#define DEFAULT_BLOCK_ROW_COUNT 10000
/* Limits for option parameters */
#define STRIPE_ROW_COUNT_MINIMUM 1000
#define STRIPE_ROW_COUNT_MAXIMUM 10000000
#define BLOCK_ROW_COUNT_MINIMUM 1000
#define BLOCK_ROW_COUNT_MAXIMUM 100000
/* String representations of compression types */
#define COMPRESSION_STRING_NONE "none"
#define COMPRESSION_STRING_PG_LZ "pglz"
/* CStore file signature */
#define CSTORE_MAGIC_NUMBER "citus_cstore"
#define CSTORE_VERSION_MAJOR 1
#define CSTORE_VERSION_MINOR 7
/* miscellaneous defines */
#define CSTORE_FDW_NAME "cstore_fdw"
#define CSTORE_FOOTER_FILE_SUFFIX ".footer"
#define CSTORE_TEMP_FILE_SUFFIX ".tmp"
#define CSTORE_TUPLE_COST_MULTIPLIER 10
#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1
#define CSTORE_POSTSCRIPT_SIZE_MAX 256
/* Enumaration for cstore file's compression method */
typedef enum
{
COMPRESSION_TYPE_INVALID = -1,
COMPRESSION_NONE = 0,
COMPRESSION_PG_LZ = 1,
COMPRESSION_COUNT
} CompressionType;
/*
* CStoreFdwOptions holds the option values to be used when reading or writing
* a cstore file. To resolve these values, we first check foreign table's options,
* and if not present, we then fall back to the default values specified above.
*/
typedef struct CStoreOptions
{
char *filename;
CompressionType compressionType;
uint64 stripeRowCount;
uint32 blockRowCount;
} CStoreOptions;
/*
* StripeMetadata represents information about a stripe. This information is
* stored in the cstore file's footer.
*/
typedef struct StripeMetadata
{
uint64 fileOffset;
uint64 skipListLength;
uint64 dataLength;
uint64 footerLength;
} StripeMetadata;
/* TableFooter represents the footer of a cstore file. */
typedef struct TableFooter
{
List *stripeMetadataList;
uint64 blockRowCount;
} TableFooter;
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
typedef struct ColumnBlockSkipNode
{
/* statistics about values of a column block */
bool hasMinMax;
Datum minimumValue;
Datum maximumValue;
uint64 rowCount;
/*
* Offsets and sizes of value and exists streams in the column data.
* These enable us to skip reading suppressed row blocks, and start reading
* a block without reading previous blocks.
*/
uint64 valueBlockOffset;
uint64 valueLength;
uint64 existsBlockOffset;
uint64 existsLength;
CompressionType valueCompressionType;
} ColumnBlockSkipNode;
/*
* StripeSkipList can be used for skipping row blocks. It contains a column block
* skip node for each block of each column. blockSkipNodeArray[column][block]
* is the entry for the specified column block.
*/
typedef struct StripeSkipList
{
ColumnBlockSkipNode **blockSkipNodeArray;
uint32 columnCount;
uint32 blockCount;
} StripeSkipList;
/*
* ColumnBlockData represents a block of data in a column. valueArray stores
* the values of data, and existsArray stores whether a value is present.
* valueBuffer is used to store (uncompressed) serialized values
* referenced by Datum's in valueArray. It is only used for by-reference Datum's.
* There is a one-to-one correspondence between valueArray and existsArray.
*/
typedef struct ColumnBlockData
{
bool *existsArray;
Datum *valueArray;
/* valueBuffer keeps actual data for type-by-reference datums from valueArray. */
StringInfo valueBuffer;
} ColumnBlockData;
/*
* ColumnBlockBuffers represents a block of serialized data in a column.
* valueBuffer stores the serialized values of data, and existsBuffer stores
* serialized value of presence information. valueCompressionType contains
* compression type if valueBuffer is compressed. Finally rowCount has
* the number of rows in this block.
*/
typedef struct ColumnBlockBuffers
{
StringInfo existsBuffer;
StringInfo valueBuffer;
CompressionType valueCompressionType;
} ColumnBlockBuffers;
/*
* ColumnBuffers represents data buffers for a column in a row stripe. Each
* column is made of multiple column blocks.
*/
typedef struct ColumnBuffers
{
ColumnBlockBuffers **blockBuffersArray;
} ColumnBuffers;
/* StripeBuffers represents data for a row stripe in a cstore file. */
typedef struct StripeBuffers
{
uint32 columnCount;
uint32 rowCount;
ColumnBuffers **columnBuffersArray;
} StripeBuffers;
/*
* StripeFooter represents a stripe's footer. In this footer, we keep three
* arrays of sizes. The number of elements in each of the arrays is equal
* to the number of columns.
*/
typedef struct StripeFooter
{
uint32 columnCount;
uint64 *skipListSizeArray;
uint64 *existsSizeArray;
uint64 *valueSizeArray;
} StripeFooter;
/* TableReadState represents state of a cstore file read operation. */
typedef struct TableReadState
{
FILE *tableFile;
TableFooter *tableFooter;
TupleDesc tupleDescriptor;
/*
* List of Var pointers for columns in the query. We use this both for
* getting vector of projected columns, and also when we want to build
* base constraint to find selected row blocks.
*/
List *projectedColumnList;
List *whereClauseList;
MemoryContext stripeReadContext;
StripeBuffers *stripeBuffers;
uint32 readStripeCount;
uint64 stripeReadRowCount;
ColumnBlockData **blockDataArray;
int32 deserializedBlockIndex;
} TableReadState;
/* TableWriteState represents state of a cstore file write operation. */
typedef struct TableWriteState
{
FILE *tableFile;
TableFooter *tableFooter;
StringInfo tableFooterFilename;
CompressionType compressionType;
TupleDesc tupleDescriptor;
FmgrInfo **comparisonFunctionArray;
uint64 currentFileOffset;
Relation relation;
MemoryContext stripeWriteContext;
StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList;
uint32 stripeMaxRowCount;
ColumnBlockData **blockDataArray;
/*
* compressionBuffer buffer is used as temporary storage during
* data value compression operation. It is kept here to minimize
* memory allocations. It lives in stripeWriteContext and gets
* deallocated when memory context is reset.
*/
StringInfo compressionBuffer;
} TableWriteState;
/* Function declarations for extension loading and unloading */
extern void _PG_init(void);
extern void _PG_fini(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString);
extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
CStoreOptions *cstoreOptions);
extern void CreateCStoreDatabaseDirectory(Oid databaseOid);
extern void RemoveCStoreDatabaseDirectory(Oid databaseOid);
/* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(const char *filename,
CompressionType compressionType,
uint64 stripeMaxRowCount,
uint32 blockRowCount,
TupleDesc tupleDescriptor);
extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreEndWrite(TableWriteState * state);
/* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor,
List *projectedColumnList, List *qualConditions);
extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename);
extern bool CStoreReadFinished(TableReadState *state);
extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreEndRead(TableReadState *state);
/* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
int16 procedureId);
extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask,
uint32 blockRowCount);
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
uint32 columnCount);
extern uint64 CStoreTableRowCount(const char *filename);
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
#endif /* CSTORE_H */

View File

@ -95,24 +95,18 @@ static List * FindCStoreTables(List *tableList);
static List * OpenRelationsForTruncate(List *cstoreTableList);
static void TruncateCStoreTables(List *cstoreRelationList);
static void DeleteCStoreTableFiles(char *filename);
static void InitializeCStoreTableFile(Oid relationId, Relation relation);
static bool CStoreTable(Oid relationId);
static bool CStoreServer(ForeignServer *server);
static bool DistributedTable(Oid relationId);
static bool DistributedWorkerCopy(CopyStmt *copyStatement);
static void CreateCStoreDatabaseDirectory(Oid databaseOid);
static bool DirectoryExists(StringInfo directoryName);
static void CreateDirectory(StringInfo directoryName);
static void RemoveCStoreDatabaseDirectory(Oid databaseOid);
static StringInfo OptionNamesString(Oid currentContextId);
static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts);
static CStoreFdwOptions * CStoreGetOptions(Oid foreignTableId);
static CStoreOptions * CStoreGetOptions(Oid foreignTableId);
static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName);
static void ValidateForeignTableOptions(char *filename, char *compressionTypeString,
char *stripeRowCountString,
char *blockRowCountString);
static char * CStoreDefaultFilePath(Oid foreignTableId);
static CompressionType ParseCompressionType(const char *compressionTypeString);
static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
Oid foreignTableId);
static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
@ -160,9 +154,6 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
#endif
/* declarations for dynamic loading */
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
PG_FUNCTION_INFO_V1(cstore_table_size);
PG_FUNCTION_INFO_V1(cstore_fdw_handler);
@ -175,11 +166,11 @@ static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/*
* _PG_init is called when the module is loaded. In this function we save the
* Called when the module is loaded. In this function we save the
* previous utility hook, and then install our hook to pre-intercept calls to
* the copy command.
*/
void _PG_init(void)
void cstore_fdw_init()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = CStoreProcessUtility;
@ -187,10 +178,10 @@ void _PG_init(void)
/*
* _PG_fini is called when the module is unloaded. This function uninstalls the
* Called when the module is unloaded. This function uninstalls the
* extension's hooks.
*/
void _PG_fini(void)
void cstore_fdw_finish()
{
ProcessUtility_hook = PreviousProcessUtilityHook;
}
@ -249,7 +240,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
*/
CreateCStoreDatabaseDirectory(MyDatabaseId);
InitializeCStoreTableFile(relationId, relation);
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
heap_close(relation, AccessExclusiveLock);
}
}
@ -525,7 +516,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
Datum *columnValues = NULL;
bool *columnNulls = NULL;
TableWriteState *writeState = NULL;
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
MemoryContext tupleContext = NULL;
/* Only superuser can copy from or to local file */
@ -546,7 +537,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
cstoreFdwOptions = CStoreGetOptions(relationId);
cstoreOptions = CStoreGetOptions(relationId);
/*
* We create a new memory context called tuple context, and read and write
@ -580,10 +571,10 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
#endif
/* init state to write to the cstore file */
writeState = CStoreBeginWrite(cstoreFdwOptions->filename,
cstoreFdwOptions->compressionType,
cstoreFdwOptions->stripeRowCount,
cstoreFdwOptions->blockRowCount,
writeState = CStoreBeginWrite(cstoreOptions->filename,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupleDescriptor);
while (nextRowFound)
@ -765,7 +756,7 @@ DroppedCStoreFilenameList(DropStmt *dropStatement)
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
if (CStoreTable(relationId))
{
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(relationId);
CStoreOptions *cstoreOptions = CStoreGetOptions(relationId);
char *defaultfilename = CStoreDefaultFilePath(relationId);
/*
@ -773,13 +764,13 @@ DroppedCStoreFilenameList(DropStmt *dropStatement)
* by sql drop trigger. Both paths are generated by code, use
* of strcmp is safe here.
*/
if (strcmp(defaultfilename, cstoreFdwOptions->filename) == 0)
if (strcmp(defaultfilename, cstoreOptions->filename) == 0)
{
continue;
}
droppedCStoreFileList = lappend(droppedCStoreFileList,
cstoreFdwOptions->filename);
cstoreOptions->filename);
}
}
}
@ -857,13 +848,13 @@ TruncateCStoreTables(List *cstoreRelationList)
{
Relation relation = (Relation) lfirst(relationCell);
Oid relationId = relation->rd_id;
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
Assert(CStoreTable(relationId));
cstoreFdwOptions = CStoreGetOptions(relationId);
DeleteCStoreTableFiles(cstoreFdwOptions->filename);
InitializeCStoreTableFile(relationId, relation);
cstoreOptions = CStoreGetOptions(relationId);
DeleteCStoreTableFiles(cstoreOptions->filename);
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
}
}
@ -901,29 +892,6 @@ DeleteCStoreTableFiles(char *filename)
}
/*
* InitializeCStoreTableFile creates data and footer file for a cstore table.
* The function assumes data and footer files do not exist, therefore
* it should be called on empty or non-existing table. Notice that the caller
* is expected to acquire AccessExclusiveLock on the relation.
*/
static void InitializeCStoreTableFile(Oid relationId, Relation relation)
{
TableWriteState *writeState = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(relation);
CStoreFdwOptions* cstoreFdwOptions = CStoreGetOptions(relationId);
/*
* Initialize state to write to the cstore file. This creates an
* empty data file and a valid footer file for the table.
*/
writeState = CStoreBeginWrite(cstoreFdwOptions->filename,
cstoreFdwOptions->compressionType, cstoreFdwOptions->stripeRowCount,
cstoreFdwOptions->blockRowCount, tupleDescriptor);
CStoreEndWrite(writeState);
}
/*
* CStoreTable checks if the given table name belongs to a foreign columnar store
@ -1045,111 +1013,7 @@ DistributedWorkerCopy(CopyStmt *copyStatement)
}
/*
* CreateCStoreDatabaseDirectory creates the directory (and parent directories,
* if needed) used to store automatically managed cstore_fdw files. The path to
* the directory is $PGDATA/cstore_fdw/{databaseOid}.
*/
static void
CreateCStoreDatabaseDirectory(Oid databaseOid)
{
bool cstoreDirectoryExists = false;
bool databaseDirectoryExists = false;
StringInfo cstoreDatabaseDirectoryPath = NULL;
StringInfo cstoreDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath);
if (!cstoreDirectoryExists)
{
CreateDirectory(cstoreDirectoryPath);
}
cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath);
if (!databaseDirectoryExists)
{
CreateDirectory(cstoreDatabaseDirectoryPath);
}
}
/* DirectoryExists checks if a directory exists for the given directory name. */
static bool
DirectoryExists(StringInfo directoryName)
{
bool directoryExists = true;
struct stat directoryStat;
int statOK = stat(directoryName->data, &directoryStat);
if (statOK == 0)
{
/* file already exists; check that it is a directory */
if (!S_ISDIR(directoryStat.st_mode))
{
ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data),
errhint("You need to remove or rename the file \"%s\".",
directoryName->data)));
}
}
else
{
if (errno == ENOENT)
{
directoryExists = false;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat directory \"%s\": %m",
directoryName->data)));
}
}
return directoryExists;
}
/* CreateDirectory creates a new directory with the given directory name. */
static void
CreateDirectory(StringInfo directoryName)
{
int makeOK = mkdir(directoryName->data, S_IRWXU);
if (makeOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create directory \"%s\": %m",
directoryName->data)));
}
}
/*
* RemoveCStoreDatabaseDirectory removes CStore directory previously
* created for this database.
* However it does not remove 'cstore_fdw' directory even if there
* are no other databases left.
*/
static void
RemoveCStoreDatabaseDirectory(Oid databaseOid)
{
StringInfo cstoreDirectoryPath = makeStringInfo();
StringInfo cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
if (DirectoryExists(cstoreDatabaseDirectoryPath))
{
rmtree(cstoreDatabaseDirectoryPath->data, true);
}
}
/*
@ -1162,7 +1026,7 @@ cstore_table_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
int64 tableSize = 0;
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
char *dataFilename = NULL;
StringInfo footerFilename = NULL;
int dataFileStatResult = 0;
@ -1176,8 +1040,8 @@ cstore_table_size(PG_FUNCTION_ARGS)
ereport(ERROR, (errmsg("relation is not a cstore table")));
}
cstoreFdwOptions = CStoreGetOptions(relationId);
dataFilename = cstoreFdwOptions->filename;
cstoreOptions = CStoreGetOptions(relationId);
dataFilename = cstoreOptions->filename;
dataFileStatResult = stat(dataFilename, &dataFileStatBuffer);
if (dataFileStatResult != 0)
@ -1402,10 +1266,10 @@ GetSlotHeapTuple(TupleTableSlot *tts)
* foreign table, and if not present, falls back to default values. This function
* errors out if given option values are considered invalid.
*/
static CStoreFdwOptions *
static CStoreOptions *
CStoreGetOptions(Oid foreignTableId)
{
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
char *filename = NULL;
CompressionType compressionType = DEFAULT_COMPRESSION_TYPE;
int32 stripeRowCount = DEFAULT_STRIPE_ROW_COUNT;
@ -1445,13 +1309,13 @@ CStoreGetOptions(Oid foreignTableId)
filename = CStoreDefaultFilePath(foreignTableId);
}
cstoreFdwOptions = palloc0(sizeof(CStoreFdwOptions));
cstoreFdwOptions->filename = filename;
cstoreFdwOptions->compressionType = compressionType;
cstoreFdwOptions->stripeRowCount = stripeRowCount;
cstoreFdwOptions->blockRowCount = blockRowCount;
cstoreOptions = palloc0(sizeof(CStoreOptions));
cstoreOptions->filename = filename;
cstoreOptions->compressionType = compressionType;
cstoreOptions->stripeRowCount = stripeRowCount;
cstoreOptions->blockRowCount = blockRowCount;
return cstoreFdwOptions;
return cstoreOptions;
}
@ -1577,26 +1441,6 @@ CStoreDefaultFilePath(Oid foreignTableId)
}
/* ParseCompressionType converts a string to a compression type. */
static CompressionType
ParseCompressionType(const char *compressionTypeString)
{
CompressionType compressionType = COMPRESSION_TYPE_INVALID;
Assert(compressionTypeString != NULL);
if (strncmp(compressionTypeString, COMPRESSION_STRING_NONE, NAMEDATALEN) == 0)
{
compressionType = COMPRESSION_NONE;
}
else if (strncmp(compressionTypeString, COMPRESSION_STRING_PG_LZ, NAMEDATALEN) == 0)
{
compressionType = COMPRESSION_PG_LZ;
}
return compressionType;
}
/*
* CStoreGetForeignRelSize obtains relation size estimates for a foreign table and
* puts its estimate for row count into baserel->rows.
@ -1604,8 +1448,8 @@ ParseCompressionType(const char *compressionTypeString)
static void
CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
{
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId);
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreFdwOptions->filename);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename);
double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo,
0, JOIN_INNER, NULL);
@ -1624,7 +1468,7 @@ static void
CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
{
Path *foreignScanPath = NULL;
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
Relation relation = heap_open(foreignTableId, AccessShareLock);
/*
@ -1645,14 +1489,14 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
*/
List *queryColumnList = ColumnList(baserel, foreignTableId);
uint32 queryColumnCount = list_length(queryColumnList);
BlockNumber relationPageCount = PageCount(cstoreFdwOptions->filename);
BlockNumber relationPageCount = PageCount(cstoreOptions->filename);
uint32 relationColumnCount = RelationGetNumberOfAttributes(relation);
double queryColumnRatio = (double) queryColumnCount / relationColumnCount;
double queryPageCount = relationPageCount * queryColumnRatio;
double totalDiskAccessCost = seq_page_cost * queryPageCount;
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreFdwOptions->filename);
double tupleCountEstimate = TupleCountEstimate(baserel, cstoreOptions->filename);
/*
* We estimate costs almost the same way as cost_seqscan(), thus assuming
@ -1922,16 +1766,16 @@ static void
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
{
Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
ExplainPropertyText("CStore File", cstoreFdwOptions->filename, explainState);
ExplainPropertyText("CStore File", cstoreOptions->filename, explainState);
/* supress file size if we're not showing cost details */
if (explainState->costs)
{
struct stat statBuffer;
int statResult = stat(cstoreFdwOptions->filename, &statBuffer);
int statResult = stat(cstoreOptions->filename, &statBuffer);
if (statResult == 0)
{
ExplainPropertyLong("CStore File Size", (long) statBuffer.st_size,
@ -1947,7 +1791,7 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
{
TableReadState *readState = NULL;
Oid foreignTableId = InvalidOid;
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
Relation currentRelation = scanState->ss.ss_currentRelation;
TupleDesc tupleDescriptor = RelationGetDescr(currentRelation);
List *columnList = NIL;
@ -1962,14 +1806,14 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
}
foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
cstoreFdwOptions = CStoreGetOptions(foreignTableId);
cstoreOptions = CStoreGetOptions(foreignTableId);
foreignScan = (ForeignScan *) scanState->ss.ps.plan;
foreignPrivateList = (List *) foreignScan->fdw_private;
whereClauseList = foreignScan->scan.plan.qual;
columnList = (List *) linitial(foreignPrivateList);
readState = CStoreBeginRead(cstoreFdwOptions->filename, tupleDescriptor,
readState = CStoreBeginRead(cstoreOptions->filename, tupleDescriptor,
columnList, whereClauseList);
scanState->fdw_state = (void *) readState;
@ -2040,18 +1884,18 @@ CStoreAnalyzeForeignTable(Relation relation,
BlockNumber *totalPageCount)
{
Oid foreignTableId = RelationGetRelid(relation);
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(foreignTableId);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
struct stat statBuffer;
int statResult = stat(cstoreFdwOptions->filename, &statBuffer);
int statResult = stat(cstoreOptions->filename, &statBuffer);
if (statResult < 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
cstoreFdwOptions->filename)));
cstoreOptions->filename)));
}
(*totalPageCount) = PageCount(cstoreFdwOptions->filename);
(*totalPageCount) = PageCount(cstoreOptions->filename);
(*acquireSampleRowsFunc) = CStoreAcquireSampleRows;
return true;
@ -2311,20 +2155,20 @@ static void
CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *relationInfo)
{
Oid foreignTableOid = InvalidOid;
CStoreFdwOptions *cstoreFdwOptions = NULL;
CStoreOptions *cstoreOptions = NULL;
TupleDesc tupleDescriptor = NULL;
TableWriteState *writeState = NULL;
Relation relation = NULL;
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock);
cstoreFdwOptions = CStoreGetOptions(foreignTableOid);
cstoreOptions = CStoreGetOptions(foreignTableOid);
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
writeState = CStoreBeginWrite(cstoreFdwOptions->filename,
cstoreFdwOptions->compressionType,
cstoreFdwOptions->stripeRowCount,
cstoreFdwOptions->blockRowCount,
writeState = CStoreBeginWrite(cstoreOptions->filename,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupleDescriptor);
writeState->relation = relation;

View File

@ -22,41 +22,7 @@
#include "lib/stringinfo.h"
#include "utils/rel.h"
/* Defines for valid option names */
#define OPTION_NAME_FILENAME "filename"
#define OPTION_NAME_COMPRESSION_TYPE "compression"
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count"
#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count"
/* Default values for option parameters */
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_NONE
#define DEFAULT_STRIPE_ROW_COUNT 150000
#define DEFAULT_BLOCK_ROW_COUNT 10000
/* Limits for option parameters */
#define STRIPE_ROW_COUNT_MINIMUM 1000
#define STRIPE_ROW_COUNT_MAXIMUM 10000000
#define BLOCK_ROW_COUNT_MINIMUM 1000
#define BLOCK_ROW_COUNT_MAXIMUM 100000
/* String representations of compression types */
#define COMPRESSION_STRING_NONE "none"
#define COMPRESSION_STRING_PG_LZ "pglz"
#define COMPRESSION_STRING_DELIMITED_LIST "none, pglz"
/* CStore file signature */
#define CSTORE_MAGIC_NUMBER "citus_cstore"
#define CSTORE_VERSION_MAJOR 1
#define CSTORE_VERSION_MINOR 7
/* miscellaneous defines */
#define CSTORE_FDW_NAME "cstore_fdw"
#define CSTORE_FOOTER_FILE_SUFFIX ".footer"
#define CSTORE_TEMP_FILE_SUFFIX ".tmp"
#define CSTORE_TUPLE_COST_MULTIPLIER 10
#define CSTORE_POSTSCRIPT_SIZE_LENGTH 1
#define CSTORE_POSTSCRIPT_SIZE_MAX 256
#include "cstore.h"
/* table containing information about how to partition distributed tables */
#define CITUS_EXTENSION_NAME "citus"
@ -67,7 +33,6 @@
#define ATTR_NUM_PARTITION_TYPE 2
#define ATTR_NUM_PARTITION_KEY 3
/*
* CStoreValidOption keeps an option name and a context. When an option is passed
* into cstore_fdw objects (server and foreign table), we compare this option's
@ -80,6 +45,7 @@ typedef struct CStoreValidOption
} CStoreValidOption;
#define COMPRESSION_STRING_DELIMITED_LIST "none, pglz"
/* Array of options that are valid for cstore_fdw */
static const uint32 ValidOptionCount = 4;
@ -92,220 +58,8 @@ static const CStoreValidOption ValidOptionArray[] =
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
};
/* Enumaration for cstore file's compression method */
typedef enum
{
COMPRESSION_TYPE_INVALID = -1,
COMPRESSION_NONE = 0,
COMPRESSION_PG_LZ = 1,
COMPRESSION_COUNT
} CompressionType;
/*
* CStoreFdwOptions holds the option values to be used when reading or writing
* a cstore file. To resolve these values, we first check foreign table's options,
* and if not present, we then fall back to the default values specified above.
*/
typedef struct CStoreFdwOptions
{
char *filename;
CompressionType compressionType;
uint64 stripeRowCount;
uint32 blockRowCount;
} CStoreFdwOptions;
/*
* StripeMetadata represents information about a stripe. This information is
* stored in the cstore file's footer.
*/
typedef struct StripeMetadata
{
uint64 fileOffset;
uint64 skipListLength;
uint64 dataLength;
uint64 footerLength;
} StripeMetadata;
/* TableFooter represents the footer of a cstore file. */
typedef struct TableFooter
{
List *stripeMetadataList;
uint64 blockRowCount;
} TableFooter;
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
typedef struct ColumnBlockSkipNode
{
/* statistics about values of a column block */
bool hasMinMax;
Datum minimumValue;
Datum maximumValue;
uint64 rowCount;
/*
* Offsets and sizes of value and exists streams in the column data.
* These enable us to skip reading suppressed row blocks, and start reading
* a block without reading previous blocks.
*/
uint64 valueBlockOffset;
uint64 valueLength;
uint64 existsBlockOffset;
uint64 existsLength;
CompressionType valueCompressionType;
} ColumnBlockSkipNode;
/*
* StripeSkipList can be used for skipping row blocks. It contains a column block
* skip node for each block of each column. blockSkipNodeArray[column][block]
* is the entry for the specified column block.
*/
typedef struct StripeSkipList
{
ColumnBlockSkipNode **blockSkipNodeArray;
uint32 columnCount;
uint32 blockCount;
} StripeSkipList;
/*
* ColumnBlockData represents a block of data in a column. valueArray stores
* the values of data, and existsArray stores whether a value is present.
* valueBuffer is used to store (uncompressed) serialized values
* referenced by Datum's in valueArray. It is only used for by-reference Datum's.
* There is a one-to-one correspondence between valueArray and existsArray.
*/
typedef struct ColumnBlockData
{
bool *existsArray;
Datum *valueArray;
/* valueBuffer keeps actual data for type-by-reference datums from valueArray. */
StringInfo valueBuffer;
} ColumnBlockData;
/*
* ColumnBlockBuffers represents a block of serialized data in a column.
* valueBuffer stores the serialized values of data, and existsBuffer stores
* serialized value of presence information. valueCompressionType contains
* compression type if valueBuffer is compressed. Finally rowCount has
* the number of rows in this block.
*/
typedef struct ColumnBlockBuffers
{
StringInfo existsBuffer;
StringInfo valueBuffer;
CompressionType valueCompressionType;
} ColumnBlockBuffers;
/*
* ColumnBuffers represents data buffers for a column in a row stripe. Each
* column is made of multiple column blocks.
*/
typedef struct ColumnBuffers
{
ColumnBlockBuffers **blockBuffersArray;
} ColumnBuffers;
/* StripeBuffers represents data for a row stripe in a cstore file. */
typedef struct StripeBuffers
{
uint32 columnCount;
uint32 rowCount;
ColumnBuffers **columnBuffersArray;
} StripeBuffers;
/*
* StripeFooter represents a stripe's footer. In this footer, we keep three
* arrays of sizes. The number of elements in each of the arrays is equal
* to the number of columns.
*/
typedef struct StripeFooter
{
uint32 columnCount;
uint64 *skipListSizeArray;
uint64 *existsSizeArray;
uint64 *valueSizeArray;
} StripeFooter;
/* TableReadState represents state of a cstore file read operation. */
typedef struct TableReadState
{
FILE *tableFile;
TableFooter *tableFooter;
TupleDesc tupleDescriptor;
/*
* List of Var pointers for columns in the query. We use this both for
* getting vector of projected columns, and also when we want to build
* base constraint to find selected row blocks.
*/
List *projectedColumnList;
List *whereClauseList;
MemoryContext stripeReadContext;
StripeBuffers *stripeBuffers;
uint32 readStripeCount;
uint64 stripeReadRowCount;
ColumnBlockData **blockDataArray;
int32 deserializedBlockIndex;
} TableReadState;
/* TableWriteState represents state of a cstore file write operation. */
typedef struct TableWriteState
{
FILE *tableFile;
TableFooter *tableFooter;
StringInfo tableFooterFilename;
CompressionType compressionType;
TupleDesc tupleDescriptor;
FmgrInfo **comparisonFunctionArray;
uint64 currentFileOffset;
Relation relation;
MemoryContext stripeWriteContext;
StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList;
uint32 stripeMaxRowCount;
ColumnBlockData **blockDataArray;
/*
* compressionBuffer buffer is used as temporary storage during
* data value compression operation. It is kept here to minimize
* memory allocations. It lives in stripeWriteContext and gets
* deallocated when memory context is reset.
*/
StringInfo compressionBuffer;
} TableWriteState;
/* Function declarations for extension loading and unloading */
extern void _PG_init(void);
extern void _PG_fini(void);
void cstore_fdw_init(void);
void cstore_fdw_finish(void);
/* event trigger function declarations */
extern Datum cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS);
@ -318,36 +72,4 @@ extern Datum cstore_clean_table_resources(PG_FUNCTION_ARGS);
extern Datum cstore_fdw_handler(PG_FUNCTION_ARGS);
extern Datum cstore_fdw_validator(PG_FUNCTION_ARGS);
/* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(const char *filename,
CompressionType compressionType,
uint64 stripeMaxRowCount,
uint32 blockRowCount,
TupleDesc tupleDescriptor);
extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreEndWrite(TableWriteState * state);
/* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor,
List *projectedColumnList, List *qualConditions);
extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename);
extern bool CStoreReadFinished(TableReadState *state);
extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreEndRead(TableReadState *state);
/* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
int16 procedureId);
extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask,
uint32 blockRowCount);
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
uint32 columnCount);
extern uint64 CStoreTableRowCount(const char *filename);
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
#endif /* CSTORE_FDW_H */

30
mod.c Normal file
View File

@ -0,0 +1,30 @@
/*-------------------------------------------------------------------------
*
* mod.c
*
* This file contains module-level definitions.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cstore_fdw.h"
PG_MODULE_MAGIC;
void _PG_init(void)
{
cstore_fdw_init();
}
void _PG_fini(void)
{
cstore_fdw_finish();
}