Merge pull request #6 from citusdata/smgr

Smgr
merge-cstore-pykello
jeff-davis 2020-09-14 13:52:25 -07:00 committed by GitHub
commit c570932712
17 changed files with 213 additions and 727 deletions

132
cstore.c
View File

@ -21,9 +21,6 @@
#include "cstore.h" #include "cstore.h"
static void CreateDirectory(StringInfo directoryName);
static bool DirectoryExists(StringInfo directoryName);
/* ParseCompressionType converts a string to a compression type. */ /* ParseCompressionType converts a string to a compression type. */
CompressionType CompressionType
ParseCompressionType(const char *compressionTypeString) ParseCompressionType(const char *compressionTypeString)
@ -44,80 +41,6 @@ ParseCompressionType(const char *compressionTypeString)
} }
/* CreateDirectory creates a new directory with the given directory name. */
static void
CreateDirectory(StringInfo directoryName)
{
int makeOK = mkdir(directoryName->data, S_IRWXU);
if (makeOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create directory \"%s\": %m",
directoryName->data)));
}
}
/* DirectoryExists checks if a directory exists for the given directory name. */
static bool
DirectoryExists(StringInfo directoryName)
{
bool directoryExists = true;
struct stat directoryStat;
int statOK = stat(directoryName->data, &directoryStat);
if (statOK == 0)
{
/* file already exists; check that it is a directory */
if (!S_ISDIR(directoryStat.st_mode))
{
ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data),
errhint("You need to remove or rename the file \"%s\".",
directoryName->data)));
}
}
else
{
if (errno == ENOENT)
{
directoryExists = false;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat directory \"%s\": %m",
directoryName->data)));
}
}
return directoryExists;
}
/*
* RemoveCStoreDatabaseDirectory removes CStore directory previously
* created for this database.
* However it does not remove 'cstore_fdw' directory even if there
* are no other databases left.
*/
void
RemoveCStoreDatabaseDirectory(Oid databaseOid)
{
StringInfo cstoreDirectoryPath = makeStringInfo();
StringInfo cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
if (DirectoryExists(cstoreDatabaseDirectoryPath))
{
rmtree(cstoreDatabaseDirectoryPath->data, true);
}
}
/* /*
* InitializeCStoreTableFile creates data and footer file for a cstore table. * InitializeCStoreTableFile creates data and footer file for a cstore table.
* The function assumes data and footer files do not exist, therefore * The function assumes data and footer files do not exist, therefore
@ -136,62 +59,9 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto
* Initialize state to write to the cstore file. This creates an * Initialize state to write to the cstore file. This creates an
* empty data file and a valid footer file for the table. * empty data file and a valid footer file for the table.
*/ */
writeState = CStoreBeginWrite(relationId, cstoreOptions->filename, writeState = CStoreBeginWrite(relationId,
cstoreOptions->compressionType, cstoreOptions->compressionType,
cstoreOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, tupleDescriptor); cstoreOptions->blockRowCount, tupleDescriptor);
CStoreEndWrite(writeState); CStoreEndWrite(writeState);
} }
/*
* CreateCStoreDatabaseDirectory creates the directory (and parent directories,
* if needed) used to store automatically managed cstore_fdw files. The path to
* the directory is $PGDATA/cstore_fdw/{databaseOid}.
*/
void
CreateCStoreDatabaseDirectory(Oid databaseOid)
{
bool cstoreDirectoryExists = false;
bool databaseDirectoryExists = false;
StringInfo cstoreDatabaseDirectoryPath = NULL;
StringInfo cstoreDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath);
if (!cstoreDirectoryExists)
{
CreateDirectory(cstoreDirectoryPath);
}
cstoreDatabaseDirectoryPath = makeStringInfo();
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
CSTORE_FDW_NAME, databaseOid);
databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath);
if (!databaseDirectoryExists)
{
CreateDirectory(cstoreDatabaseDirectoryPath);
}
}
/*
* DeleteCStoreTableFiles deletes the data and footer files for a cstore table
* whose data filename is given.
*/
void
DeleteCStoreTableFiles(char *filename)
{
int dataFileRemoved = 0;
/* delete the data file */
dataFileRemoved = unlink(filename);
if (dataFileRemoved != 0)
{
ereport(WARNING, (errcode_for_file_access(),
errmsg("could not delete file \"%s\": %m",
filename)));
}
}

View File

@ -16,10 +16,10 @@
#include "fmgr.h" #include "fmgr.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "storage/bufpage.h"
#include "utils/relcache.h" #include "utils/relcache.h"
/* Defines for valid option names */ /* Defines for valid option names */
#define OPTION_NAME_FILENAME "filename"
#define OPTION_NAME_COMPRESSION_TYPE "compression" #define OPTION_NAME_COMPRESSION_TYPE "compression"
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count" #define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count"
#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count" #define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count"
@ -68,7 +68,6 @@ typedef enum
*/ */
typedef struct CStoreOptions typedef struct CStoreOptions
{ {
char *filename;
CompressionType compressionType; CompressionType compressionType;
uint64 stripeRowCount; uint64 stripeRowCount;
uint32 blockRowCount; uint32 blockRowCount;
@ -203,10 +202,9 @@ typedef struct TableReadState
{ {
Oid relationId; Oid relationId;
FILE *tableFile;
TableMetadata *tableMetadata; TableMetadata *tableMetadata;
TupleDesc tupleDescriptor; TupleDesc tupleDescriptor;
Relation relation;
/* /*
* List of Var pointers for columns in the query. We use this both for * List of Var pointers for columns in the query. We use this both for
* getting vector of projected columns, and also when we want to build * getting vector of projected columns, and also when we want to build
@ -228,7 +226,6 @@ typedef struct TableReadState
typedef struct TableWriteState typedef struct TableWriteState
{ {
Oid relationId; Oid relationId;
FILE *tableFile;
TableMetadata *tableMetadata; TableMetadata *tableMetadata;
CompressionType compressionType; CompressionType compressionType;
TupleDesc tupleDescriptor; TupleDesc tupleDescriptor;
@ -257,11 +254,9 @@ extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
CStoreOptions *cstoreOptions); CStoreOptions *cstoreOptions);
extern void CreateCStoreDatabaseDirectory(Oid databaseOid); extern void CreateCStoreDatabaseDirectory(Oid databaseOid);
extern void RemoveCStoreDatabaseDirectory(Oid databaseOid); extern void RemoveCStoreDatabaseDirectory(Oid databaseOid);
extern void DeleteCStoreTableFiles(char *filename);
/* Function declarations for writing to a cstore file */ /* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(Oid relationId, extern TableWriteState * CStoreBeginWrite(Oid relationId,
const char *filename,
CompressionType compressionType, CompressionType compressionType,
uint64 stripeMaxRowCount, uint64 stripeMaxRowCount,
uint32 blockRowCount, uint32 blockRowCount,
@ -271,7 +266,7 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
extern void CStoreEndWrite(TableWriteState *state); extern void CStoreEndWrite(TableWriteState *state);
/* Function declarations for reading from a cstore file */ /* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename, extern TableReadState * CStoreBeginRead(Oid relationId,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
List *projectedColumnList, List *qualConditions); List *projectedColumnList, List *qualConditions);
extern bool CStoreReadFinished(TableReadState *state); extern bool CStoreReadFinished(TableReadState *state);
@ -286,7 +281,7 @@ extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *co
uint32 blockRowCount); uint32 blockRowCount);
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
uint32 columnCount); uint32 columnCount);
extern uint64 CStoreTableRowCount(Oid relid, const char *filename); extern uint64 CStoreTableRowCount(Relation relation);
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType); CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
@ -294,8 +289,31 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio
/* cstore_metadata_tables.c */ /* cstore_metadata_tables.c */
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer); extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount); extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount); extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe); extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
extern TableMetadata * ReadTableMetadata(Oid relid); extern TableMetadata * ReadTableMetadata(Oid relid);
typedef struct SmgrAddr
{
BlockNumber blockno;
uint32 offset;
} SmgrAddr;
/*
* Map logical offsets (as tracked in the metadata) to a physical page and
* offset where the data is kept.
*/
static inline SmgrAddr
logical_to_smgr(uint64 logicalOffset)
{
uint64 bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
SmgrAddr addr;
addr.blockno = logicalOffset / bytes_per_page;
addr.offset = SizeOfPageHeaderData + (logicalOffset % bytes_per_page);
return addr;
}
#endif /* CSTORE_H */ #endif /* CSTORE_H */

View File

@ -100,7 +100,6 @@ static const uint32 ValidOptionCount = 4;
static const CStoreValidOption ValidOptionArray[] = static const CStoreValidOption ValidOptionArray[] =
{ {
/* foreign table options */ /* foreign table options */
{ OPTION_NAME_FILENAME, ForeignTableRelationId },
{ OPTION_NAME_COMPRESSION_TYPE, ForeignTableRelationId }, { OPTION_NAME_COMPRESSION_TYPE, ForeignTableRelationId },
{ OPTION_NAME_STRIPE_ROW_COUNT, ForeignTableRelationId }, { OPTION_NAME_STRIPE_ROW_COUNT, ForeignTableRelationId },
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId } { OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
@ -130,7 +129,7 @@ static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
static List * DroppedCStoreRelidList(DropStmt *dropStatement); static List * DroppedCStoreRelidList(DropStmt *dropStatement);
static List * FindCStoreTables(List *tableList); static List * FindCStoreTables(List *tableList);
static List * OpenRelationsForTruncate(List *cstoreTableList); static List * OpenRelationsForTruncate(List *cstoreTableList);
static void InitializeRelFileNode(Relation relation); static void InitializeRelFileNode(Relation relation, bool force);
static void TruncateCStoreTables(List *cstoreRelationList); static void TruncateCStoreTables(List *cstoreRelationList);
static bool CStoreTable(Oid relationId); static bool CStoreTable(Oid relationId);
static bool CStoreServer(ForeignServer *server); static bool CStoreServer(ForeignServer *server);
@ -140,10 +139,9 @@ static StringInfo OptionNamesString(Oid currentContextId);
static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts); static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts);
static CStoreOptions * CStoreGetOptions(Oid foreignTableId); static CStoreOptions * CStoreGetOptions(Oid foreignTableId);
static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName); static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName);
static void ValidateForeignTableOptions(char *filename, char *compressionTypeString, static void ValidateForeignTableOptions(char *compressionTypeString,
char *stripeRowCountString, char *stripeRowCountString,
char *blockRowCountString); char *blockRowCountString);
static char * CStoreDefaultFilePath(Oid foreignTableId);
static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
Oid foreignTableId); Oid foreignTableId);
static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
@ -158,8 +156,8 @@ static ForeignScan * CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel
Oid foreignTableId, ForeignPath *bestPath, Oid foreignTableId, ForeignPath *bestPath,
List *targetList, List *scanClauses); List *targetList, List *scanClauses);
#endif #endif
static double TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename); static double TupleCountEstimate(Relation relation, RelOptInfo *baserel);
static BlockNumber PageCount(const char *filename); static BlockNumber PageCount(Relation relation);
static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId); static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId);
static void CStoreExplainForeignScan(ForeignScanState *scanState, static void CStoreExplainForeignScan(ForeignScanState *scanState,
ExplainState *explainState); ExplainState *explainState);
@ -250,17 +248,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
triggerData = (EventTriggerData *) fcinfo->context; triggerData = (EventTriggerData *) fcinfo->context;
parseTree = triggerData->parsetree; parseTree = triggerData->parsetree;
if (nodeTag(parseTree) == T_CreateForeignServerStmt) if (nodeTag(parseTree) == T_CreateForeignTableStmt)
{
CreateForeignServerStmt *serverStatement = (CreateForeignServerStmt *) parseTree;
char *foreignWrapperName = serverStatement->fdwname;
if (strncmp(foreignWrapperName, CSTORE_FDW_NAME, NAMEDATALEN) == 0)
{
CreateCStoreDatabaseDirectory(MyDatabaseId);
}
}
else if (nodeTag(parseTree) == T_CreateForeignTableStmt)
{ {
CreateForeignTableStmt *createStatement = (CreateForeignTableStmt *) parseTree; CreateForeignTableStmt *createStatement = (CreateForeignTableStmt *) parseTree;
char *serverName = createStatement->servername; char *serverName = createStatement->servername;
@ -280,8 +268,6 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
* We have no chance to hook into server creation to create data * We have no chance to hook into server creation to create data
* directory for it during database creation time. * directory for it during database creation time.
*/ */
CreateCStoreDatabaseDirectory(MyDatabaseId);
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
heap_close(relation, AccessExclusiveLock); heap_close(relation, AccessExclusiveLock);
} }
@ -361,16 +347,10 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
destReceiver, completionTag); destReceiver, completionTag);
if (removeCStoreDirectory)
{
RemoveCStoreDatabaseDirectory(MyDatabaseId);
}
} }
else else
{ {
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree); List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
List *dropFiles = NIL;
ListCell *lc = NULL; ListCell *lc = NULL;
/* drop smgr storage */ /* drop smgr storage */
@ -378,35 +358,14 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
{ {
Oid relid = lfirst_oid(lc); Oid relid = lfirst_oid(lc);
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock); Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
CStoreOptions *cstoreOptions = CStoreGetOptions(relid);
char *defaultfilename = CStoreDefaultFilePath(relid);
RelationOpenSmgr(relation); RelationOpenSmgr(relation);
RelationDropStorage(relation); RelationDropStorage(relation);
heap_close(relation, AccessExclusiveLock); heap_close(relation, AccessExclusiveLock);
/*
* Skip files that are placed in default location, they are handled
* by sql drop trigger. Both paths are generated by code, use
* of strcmp is safe here.
*/
if (strcmp(defaultfilename, cstoreOptions->filename) == 0)
{
continue;
}
dropFiles = lappend(dropFiles, cstoreOptions->filename);
} }
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
destReceiver, completionTag); destReceiver, completionTag);
/* drop files */
foreach(lc, dropFiles)
{
char *filename = lfirst(lc);
DeleteCStoreTableFiles(filename);
}
} }
} }
else if (nodeTag(parseTree) == T_TruncateStmt) else if (nodeTag(parseTree) == T_TruncateStmt)
@ -449,18 +408,9 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
} }
else if (nodeTag(parseTree) == T_DropdbStmt) else if (nodeTag(parseTree) == T_DropdbStmt)
{ {
DropdbStmt *dropDdStmt = (DropdbStmt *) parseTree;
bool missingOk = true;
Oid databaseOid = get_database_oid(dropDdStmt->dbname, missingOk);
/* let postgres handle error checking and dropping of the database */ /* let postgres handle error checking and dropping of the database */
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
destReceiver, completionTag); destReceiver, completionTag);
if (databaseOid != InvalidOid)
{
RemoveCStoreDatabaseDirectory(databaseOid);
}
} }
/* handle other utility statements */ /* handle other utility statements */
@ -642,11 +592,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
/* init state to write to the cstore file */ /* init state to write to the cstore file */
writeState = CStoreBeginWrite(relationId, writeState = CStoreBeginWrite(relationId,
cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->compressionType,
cstoreOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, cstoreOptions->blockRowCount,
tupleDescriptor); tupleDescriptor);
writeState->relation = relation;
while (nextRowFound) while (nextRowFound)
{ {
@ -912,12 +862,7 @@ TruncateCStoreTables(List *cstoreRelationList)
Assert(CStoreTable(relationId)); Assert(CStoreTable(relationId));
cstoreOptions = CStoreGetOptions(relationId); cstoreOptions = CStoreGetOptions(relationId);
if (OidIsValid(relation->rd_rel->relfilenode)) InitializeRelFileNode(relation, true);
{
RelationOpenSmgr(relation);
RelationDropStorage(relation);
}
DeleteCStoreTableFiles(cstoreOptions->filename);
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
} }
} }
@ -927,12 +872,12 @@ TruncateCStoreTables(List *cstoreRelationList)
* tables. Version 12 and later do not, so we need to create one manually. * tables. Version 12 and later do not, so we need to create one manually.
*/ */
static void static void
InitializeRelFileNode(Relation relation) InitializeRelFileNode(Relation relation, bool force)
{ {
#if PG_VERSION_NUM >= 120000 #if PG_VERSION_NUM >= 120000
Relation pg_class; Relation pg_class;
HeapTuple tuple; HeapTuple tuple;
Form_pg_class classform; Form_pg_class classform;
/* /*
* Get a writable copy of the pg_class tuple for the given relation. * Get a writable copy of the pg_class tuple for the given relation.
@ -946,12 +891,12 @@ InitializeRelFileNode(Relation relation)
RelationGetRelid(relation)); RelationGetRelid(relation));
classform = (Form_pg_class) GETSTRUCT(tuple); classform = (Form_pg_class) GETSTRUCT(tuple);
if (!OidIsValid(classform->relfilenode)) if (!OidIsValid(classform->relfilenode) || force)
{ {
char persistence = relation->rd_rel->relpersistence;
Relation tmprel; Relation tmprel;
Oid tablespace; Oid tablespace;
Oid filenode = relation->rd_id; Oid filenode;
char persistence = relation->rd_rel->relpersistence;
RelFileNode newrnode; RelFileNode newrnode;
SMgrRelation srel; SMgrRelation srel;
@ -968,6 +913,8 @@ InitializeRelFileNode(Relation relation)
else else
tablespace = MyDatabaseTableSpace; tablespace = MyDatabaseTableSpace;
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
newrnode.spcNode = tablespace; newrnode.spcNode = tablespace;
newrnode.dbNode = MyDatabaseId; newrnode.dbNode = MyDatabaseId;
newrnode.relNode = filenode; newrnode.relNode = filenode;
@ -1120,32 +1067,20 @@ Datum
cstore_table_size(PG_FUNCTION_ARGS) cstore_table_size(PG_FUNCTION_ARGS)
{ {
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
int64 tableSize = 0;
CStoreOptions *cstoreOptions = NULL;
char *dataFilename = NULL;
int dataFileStatResult = 0;
struct stat dataFileStatBuffer;
bool cstoreTable = CStoreTable(relationId); bool cstoreTable = CStoreTable(relationId);
Relation relation;
BlockNumber nblocks;
if (!cstoreTable) if (!cstoreTable)
{ {
ereport(ERROR, (errmsg("relation is not a cstore table"))); ereport(ERROR, (errmsg("relation is not a cstore table")));
} }
cstoreOptions = CStoreGetOptions(relationId); relation = cstore_fdw_open(relationId, AccessShareLock);
dataFilename = cstoreOptions->filename; RelationOpenSmgr(relation);
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
dataFileStatResult = stat(dataFilename, &dataFileStatBuffer); heap_close(relation, AccessShareLock);
if (dataFileStatResult != 0) PG_RETURN_INT64(nblocks * BLCKSZ);
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", dataFilename)));
}
tableSize += dataFileStatBuffer.st_size;
PG_RETURN_INT64(tableSize);
} }
@ -1197,7 +1132,6 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
Oid optionContextId = PG_GETARG_OID(1); Oid optionContextId = PG_GETARG_OID(1);
List *optionList = untransformRelOptions(optionArray); List *optionList = untransformRelOptions(optionArray);
ListCell *optionCell = NULL; ListCell *optionCell = NULL;
char *filename = NULL;
char *compressionTypeString = NULL; char *compressionTypeString = NULL;
char *stripeRowCountString = NULL; char *stripeRowCountString = NULL;
char *blockRowCountString = NULL; char *blockRowCountString = NULL;
@ -1232,11 +1166,7 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
optionNamesString->data))); optionNamesString->data)));
} }
if (strncmp(optionName, OPTION_NAME_FILENAME, NAMEDATALEN) == 0) if (strncmp(optionName, OPTION_NAME_COMPRESSION_TYPE, NAMEDATALEN) == 0)
{
filename = defGetString(optionDef);
}
else if (strncmp(optionName, OPTION_NAME_COMPRESSION_TYPE, NAMEDATALEN) == 0)
{ {
compressionTypeString = defGetString(optionDef); compressionTypeString = defGetString(optionDef);
} }
@ -1252,7 +1182,7 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
if (optionContextId == ForeignTableRelationId) if (optionContextId == ForeignTableRelationId)
{ {
ValidateForeignTableOptions(filename, compressionTypeString, ValidateForeignTableOptions(compressionTypeString,
stripeRowCountString, blockRowCountString); stripeRowCountString, blockRowCountString);
} }
@ -1271,11 +1201,6 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
Datum Datum
cstore_clean_table_resources(PG_FUNCTION_ARGS) cstore_clean_table_resources(PG_FUNCTION_ARGS)
{ {
Oid relationId = PG_GETARG_OID(0);
StringInfo filePath = makeStringInfo();
struct stat fileStat;
int statResult = -1;
/* /*
* TODO: Event triggers do not offer the relfilenode of the * TODO: Event triggers do not offer the relfilenode of the
* dropped table, and by the time the sql_drop event trigger * dropped table, and by the time the sql_drop event trigger
@ -1285,19 +1210,6 @@ cstore_clean_table_resources(PG_FUNCTION_ARGS)
* leak storage. * leak storage.
*/ */
appendStringInfo(filePath, "%s/%s/%d/%d", DataDir, CSTORE_FDW_NAME,
(int) MyDatabaseId, (int) relationId);
/*
* Check to see if the file exist first. This is the only way to
* find out if the table being dropped is a cstore table.
*/
statResult = stat(filePath->data, &fileStat);
if (statResult == 0)
{
DeleteCStoreTableFiles(filePath->data);
}
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -1359,7 +1271,6 @@ static CStoreOptions *
CStoreGetOptions(Oid foreignTableId) CStoreGetOptions(Oid foreignTableId)
{ {
CStoreOptions *cstoreOptions = NULL; CStoreOptions *cstoreOptions = NULL;
char *filename = NULL;
CompressionType compressionType = DEFAULT_COMPRESSION_TYPE; CompressionType compressionType = DEFAULT_COMPRESSION_TYPE;
int32 stripeRowCount = DEFAULT_STRIPE_ROW_COUNT; int32 stripeRowCount = DEFAULT_STRIPE_ROW_COUNT;
int32 blockRowCount = DEFAULT_BLOCK_ROW_COUNT; int32 blockRowCount = DEFAULT_BLOCK_ROW_COUNT;
@ -1367,7 +1278,6 @@ CStoreGetOptions(Oid foreignTableId)
char *stripeRowCountString = NULL; char *stripeRowCountString = NULL;
char *blockRowCountString = NULL; char *blockRowCountString = NULL;
filename = CStoreGetOptionValue(foreignTableId, OPTION_NAME_FILENAME);
compressionTypeString = CStoreGetOptionValue(foreignTableId, compressionTypeString = CStoreGetOptionValue(foreignTableId,
OPTION_NAME_COMPRESSION_TYPE); OPTION_NAME_COMPRESSION_TYPE);
stripeRowCountString = CStoreGetOptionValue(foreignTableId, stripeRowCountString = CStoreGetOptionValue(foreignTableId,
@ -1375,7 +1285,7 @@ CStoreGetOptions(Oid foreignTableId)
blockRowCountString = CStoreGetOptionValue(foreignTableId, blockRowCountString = CStoreGetOptionValue(foreignTableId,
OPTION_NAME_BLOCK_ROW_COUNT); OPTION_NAME_BLOCK_ROW_COUNT);
ValidateForeignTableOptions(filename, compressionTypeString, ValidateForeignTableOptions(compressionTypeString,
stripeRowCountString, blockRowCountString); stripeRowCountString, blockRowCountString);
/* parse provided options */ /* parse provided options */
@ -1392,14 +1302,7 @@ CStoreGetOptions(Oid foreignTableId)
blockRowCount = pg_atoi(blockRowCountString, sizeof(int32), 0); blockRowCount = pg_atoi(blockRowCountString, sizeof(int32), 0);
} }
/* set default filename if it is not provided */
if (filename == NULL)
{
filename = CStoreDefaultFilePath(foreignTableId);
}
cstoreOptions = palloc0(sizeof(CStoreOptions)); cstoreOptions = palloc0(sizeof(CStoreOptions));
cstoreOptions->filename = filename;
cstoreOptions->compressionType = compressionType; cstoreOptions->compressionType = compressionType;
cstoreOptions->stripeRowCount = stripeRowCount; cstoreOptions->stripeRowCount = stripeRowCount;
cstoreOptions->blockRowCount = blockRowCount; cstoreOptions->blockRowCount = blockRowCount;
@ -1450,12 +1353,9 @@ CStoreGetOptionValue(Oid foreignTableId, const char *optionName)
* considered invalid. * considered invalid.
*/ */
static void static void
ValidateForeignTableOptions(char *filename, char *compressionTypeString, ValidateForeignTableOptions(char *compressionTypeString,
char *stripeRowCountString, char *blockRowCountString) char *stripeRowCountString, char *blockRowCountString)
{ {
/* we currently do not have any checks for filename */
(void) filename;
/* check if the provided compression type is valid */ /* check if the provided compression type is valid */
if (compressionTypeString != NULL) if (compressionTypeString != NULL)
{ {
@ -1500,36 +1400,6 @@ ValidateForeignTableOptions(char *filename, char *compressionTypeString,
} }
/*
* CStoreDefaultFilePath constructs the default file path to use for a cstore_fdw
* table. The path is of the form $PGDATA/cstore_fdw/{databaseOid}/{relfilenode}.
*/
static char *
CStoreDefaultFilePath(Oid foreignTableId)
{
StringInfo cstoreFilePath = NULL;
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
RelFileNode relationFileNode = relation->rd_node;
Oid databaseOid = relationFileNode.dbNode;
Oid relationFileOid = relationFileNode.relNode;
relation_close(relation, AccessShareLock);
/* PG12 onward does not create relfilenode for foreign tables */
if (databaseOid == InvalidOid)
{
databaseOid = MyDatabaseId;
relationFileOid = foreignTableId;
}
cstoreFilePath = makeStringInfo();
appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME,
databaseOid, relationFileOid);
return cstoreFilePath->data;
}
/* /*
* CStoreGetForeignRelSize obtains relation size estimates for a foreign table and * CStoreGetForeignRelSize obtains relation size estimates for a foreign table and
* puts its estimate for row count into baserel->rows. * puts its estimate for row count into baserel->rows.
@ -1537,14 +1407,14 @@ CStoreDefaultFilePath(Oid foreignTableId)
static void static void
CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
{ {
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, double tupleCountEstimate = TupleCountEstimate(relation, baserel);
cstoreOptions->filename);
double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo, double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo,
0, JOIN_INNER, NULL); 0, JOIN_INNER, NULL);
double outputRowCount = clamp_row_est(tupleCountEstimate * rowSelectivity); double outputRowCount = clamp_row_est(tupleCountEstimate * rowSelectivity);
baserel->rows = outputRowCount; baserel->rows = outputRowCount;
heap_close(relation, AccessShareLock);
} }
@ -1558,7 +1428,6 @@ static void
CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
{ {
Path *foreignScanPath = NULL; Path *foreignScanPath = NULL;
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock); Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
/* /*
@ -1579,15 +1448,14 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
*/ */
List *queryColumnList = ColumnList(baserel, foreignTableId); List *queryColumnList = ColumnList(baserel, foreignTableId);
uint32 queryColumnCount = list_length(queryColumnList); uint32 queryColumnCount = list_length(queryColumnList);
BlockNumber relationPageCount = PageCount(cstoreOptions->filename); BlockNumber relationPageCount = PageCount(relation);
uint32 relationColumnCount = RelationGetNumberOfAttributes(relation); uint32 relationColumnCount = RelationGetNumberOfAttributes(relation);
double queryColumnRatio = (double) queryColumnCount / relationColumnCount; double queryColumnRatio = (double) queryColumnCount / relationColumnCount;
double queryPageCount = relationPageCount * queryColumnRatio; double queryPageCount = relationPageCount * queryColumnRatio;
double totalDiskAccessCost = seq_page_cost * queryPageCount; double totalDiskAccessCost = seq_page_cost * queryPageCount;
double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel, double tupleCountEstimate = TupleCountEstimate(relation, baserel);
cstoreOptions->filename);
/* /*
* We estimate costs almost the same way as cost_seqscan(), thus assuming * We estimate costs almost the same way as cost_seqscan(), thus assuming
@ -1692,7 +1560,7 @@ CStoreGetForeignPlan(PlannerInfo * root, RelOptInfo * baserel, Oid foreignTableI
* file. * file.
*/ */
static double static double
TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename) TupleCountEstimate(Relation relation, RelOptInfo *baserel)
{ {
double tupleCountEstimate = 0.0; double tupleCountEstimate = 0.0;
@ -1705,13 +1573,13 @@ TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
* that by the current file size. * that by the current file size.
*/ */
double tupleDensity = baserel->tuples / (double) baserel->pages; double tupleDensity = baserel->tuples / (double) baserel->pages;
BlockNumber pageCount = PageCount(filename); BlockNumber pageCount = PageCount(relation);
tupleCountEstimate = clamp_row_est(tupleDensity * (double) pageCount); tupleCountEstimate = clamp_row_est(tupleDensity * (double) pageCount);
} }
else else
{ {
tupleCountEstimate = (double) CStoreTableRowCount(relid, filename); tupleCountEstimate = (double) CStoreTableRowCount(relation);
} }
return tupleCountEstimate; return tupleCountEstimate;
@ -1720,25 +1588,14 @@ TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
/* PageCount calculates and returns the number of pages in a file. */ /* PageCount calculates and returns the number of pages in a file. */
static BlockNumber static BlockNumber
PageCount(const char *filename) PageCount(Relation relation)
{ {
BlockNumber pageCount = 0; BlockNumber nblocks;
struct stat statBuffer;
/* if file doesn't exist at plan time, use default estimate for its size */ RelationOpenSmgr(relation);
int statResult = stat(filename, &statBuffer); nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
if (statResult < 0)
{
statBuffer.st_size = 10 * BLCKSZ;
}
pageCount = (statBuffer.st_size + (BLCKSZ - 1)) / BLCKSZ; return (nblocks > 0) ? nblocks : 1;
if (pageCount < 1)
{
pageCount = 1;
}
return pageCount;
} }
@ -1856,25 +1713,18 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
static void static void
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState) CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
{ {
Relation relation = scanState->ss.ss_currentRelation; Relation relation = scanState->ss.ss_currentRelation;
CStoreOptions *cstoreOptions;
Oid foreignTableId;
cstore_fdw_initrel(relation); cstore_fdw_initrel(relation);
foreignTableId = RelationGetRelid(relation);
cstoreOptions = CStoreGetOptions(foreignTableId);
/* supress file size if we're not showing cost details */ /* supress file size if we're not showing cost details */
if (explainState->costs) if (explainState->costs)
{ {
struct stat statBuffer; long nblocks;
RelationOpenSmgr(relation);
int statResult = stat(cstoreOptions->filename, &statBuffer); nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
if (statResult == 0) ExplainPropertyLong("CStore File Size", (long) (nblocks * BLCKSZ),
{ explainState);
ExplainPropertyLong("CStore File Size", (long) statBuffer.st_size,
explainState);
}
} }
} }
@ -1909,8 +1759,9 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
whereClauseList = foreignScan->scan.plan.qual; whereClauseList = foreignScan->scan.plan.qual;
columnList = (List *) linitial(foreignPrivateList); columnList = (List *) linitial(foreignPrivateList);
readState = CStoreBeginRead(foreignTableId, cstoreOptions->filename, readState = CStoreBeginRead(foreignTableId,
tupleDescriptor, columnList, whereClauseList); tupleDescriptor, columnList, whereClauseList);
readState->relation = cstore_fdw_open(foreignTableId, AccessShareLock);
scanState->fdw_state = (void *) readState; scanState->fdw_state = (void *) readState;
} }
@ -1956,6 +1807,7 @@ CStoreEndForeignScan(ForeignScanState *scanState)
TableReadState *readState = (TableReadState *) scanState->fdw_state; TableReadState *readState = (TableReadState *) scanState->fdw_state;
if (readState != NULL) if (readState != NULL)
{ {
heap_close(readState->relation, AccessShareLock);
CStoreEndRead(readState); CStoreEndRead(readState);
} }
} }
@ -1979,22 +1831,9 @@ CStoreAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *acquireSampleRowsFunc, AcquireSampleRowsFunc *acquireSampleRowsFunc,
BlockNumber *totalPageCount) BlockNumber *totalPageCount)
{ {
Oid foreignTableId = RelationGetRelid(relation);
CStoreOptions *cstoreOptions;
struct stat statBuffer;
cstore_fdw_initrel(relation); cstore_fdw_initrel(relation);
cstoreOptions = CStoreGetOptions(foreignTableId); RelationOpenSmgr(relation);
(*totalPageCount) = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
int statResult = stat(cstoreOptions->filename, &statBuffer);
if (statResult < 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
cstoreOptions->filename)));
}
(*totalPageCount) = PageCount(cstoreOptions->filename);
(*acquireSampleRowsFunc) = CStoreAcquireSampleRows; (*acquireSampleRowsFunc) = CStoreAcquireSampleRows;
return true; return true;
@ -2267,7 +2106,6 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
writeState = CStoreBeginWrite(foreignTableOid, writeState = CStoreBeginWrite(foreignTableOid,
cstoreOptions->filename,
cstoreOptions->compressionType, cstoreOptions->compressionType,
cstoreOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount, cstoreOptions->blockRowCount,
@ -2372,7 +2210,7 @@ cstore_fdw_initrel(Relation rel)
{ {
#if PG_VERSION_NUM >= 120000 #if PG_VERSION_NUM >= 120000
if (rel->rd_rel->relfilenode == InvalidOid) if (rel->rd_rel->relfilenode == InvalidOid)
InitializeRelFileNode(rel); InitializeRelFileNode(rel, false);
/* /*
* Copied code from RelationInitPhysicalAddr(), which doesn't * Copied code from RelationInitPhysicalAddr(), which doesn't

View File

@ -38,7 +38,7 @@
#include "cstore_version_compat.h" #include "cstore_version_compat.h"
/* static function declarations */ /* static function declarations */
static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile, static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
StripeMetadata *stripeMetadata, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, StripeFooter *stripeFooter,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
@ -48,12 +48,12 @@ static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColum
uint64 blockIndex, uint64 blockRowIndex, uint64 blockIndex, uint64 blockRowIndex,
ColumnBlockData **blockDataArray, ColumnBlockData **blockDataArray,
Datum *columnValues, bool *columnNulls); Datum *columnValues, bool *columnNulls);
static ColumnBuffers * LoadColumnBuffers(FILE *tableFile, static ColumnBuffers * LoadColumnBuffers(Relation relation,
ColumnBlockSkipNode *blockSkipNodeArray, ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset, uint32 blockCount, uint64 existsFileOffset,
uint64 valueFileOffset, uint64 valueFileOffset,
Form_pg_attribute attributeForm); Form_pg_attribute attributeForm);
static StripeSkipList * LoadStripeSkipList(FILE *tableFile, static StripeSkipList * LoadStripeSkipList(Relation relation,
StripeMetadata *stripeMetadata, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, StripeFooter *stripeFooter,
uint32 columnCount, uint32 columnCount,
@ -82,10 +82,10 @@ static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex
TupleDesc tupleDescriptor); TupleDesc tupleDescriptor);
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
Form_pg_attribute attributeForm); Form_pg_attribute attributeForm);
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size); static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
uint32 columnCount); uint32 columnCount);
static uint64 StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata); static uint64 StripeRowCount(Relation relation, StripeMetadata *stripeMetadata);
static int RelationColumnCount(Oid relid); static int RelationColumnCount(Oid relid);
@ -94,12 +94,11 @@ static int RelationColumnCount(Oid relid);
* read handle that's used during reading rows and finishing the read operation. * read handle that's used during reading rows and finishing the read operation.
*/ */
TableReadState * TableReadState *
CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor, CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList) List *projectedColumnList, List *whereClauseList)
{ {
TableReadState *readState = NULL; TableReadState *readState = NULL;
TableMetadata *tableMetadata = NULL; TableMetadata *tableMetadata = NULL;
FILE *tableFile = NULL;
MemoryContext stripeReadContext = NULL; MemoryContext stripeReadContext = NULL;
uint32 columnCount = 0; uint32 columnCount = 0;
bool *projectedColumnMask = NULL; bool *projectedColumnMask = NULL;
@ -107,14 +106,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
tableMetadata = ReadTableMetadata(relationId); tableMetadata = ReadTableMetadata(relationId);
tableFile = AllocateFile(filename, PG_BINARY_R);
if (tableFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
filename)));
}
/* /*
* We allocate all stripe specific data in the stripeReadContext, and reset * We allocate all stripe specific data in the stripeReadContext, and reset
* this memory context before loading a new stripe. This is to avoid memory * this memory context before loading a new stripe. This is to avoid memory
@ -131,7 +122,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
readState = palloc0(sizeof(TableReadState)); readState = palloc0(sizeof(TableReadState));
readState->relationId = relationId; readState->relationId = relationId;
readState->tableFile = tableFile;
readState->tableMetadata = tableMetadata; readState->tableMetadata = tableMetadata;
readState->projectedColumnList = projectedColumnList; readState->projectedColumnList = projectedColumnList;
readState->whereClauseList = whereClauseList; readState->whereClauseList = whereClauseList;
@ -187,7 +177,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
stripeFooter = ReadStripeFooter(readState->relationId, stripeFooter = ReadStripeFooter(readState->relationId,
stripeMetadata->id, stripeMetadata->id,
readState->tupleDescriptor->natts); readState->tupleDescriptor->natts);
stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata, stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
stripeMetadata,
stripeFooter, stripeFooter,
readState->tupleDescriptor, readState->tupleDescriptor,
readState->projectedColumnList, readState->projectedColumnList,
@ -263,7 +254,6 @@ CStoreEndRead(TableReadState *readState)
int columnCount = readState->tupleDescriptor->natts; int columnCount = readState->tupleDescriptor->natts;
MemoryContextDelete(readState->stripeReadContext); MemoryContextDelete(readState->stripeReadContext);
FreeFile(readState->tableFile);
list_free_deep(readState->tableMetadata->stripeMetadataList); list_free_deep(readState->tableMetadata->stripeMetadataList);
FreeColumnBlockDataArray(readState->blockDataArray, columnCount); FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
pfree(readState->tableMetadata); pfree(readState->tableMetadata);
@ -326,30 +316,20 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount)
/* CStoreTableRowCount returns the exact row count of a table using skiplists */ /* CStoreTableRowCount returns the exact row count of a table using skiplists */
uint64 uint64
CStoreTableRowCount(Oid relid, const char *filename) CStoreTableRowCount(Relation relation)
{ {
TableMetadata *tableMetadata = NULL; TableMetadata *tableMetadata = NULL;
FILE *tableFile;
ListCell *stripeMetadataCell = NULL; ListCell *stripeMetadataCell = NULL;
uint64 totalRowCount = 0; uint64 totalRowCount = 0;
tableMetadata = ReadTableMetadata(relid); tableMetadata = ReadTableMetadata(relation->rd_id);
tableFile = AllocateFile(filename, PG_BINARY_R);
if (tableFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m", filename)));
}
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList) foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
{ {
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata); totalRowCount += StripeRowCount(relation, stripeMetadata);
} }
FreeFile(tableFile);
return totalRowCount; return totalRowCount;
} }
@ -359,15 +339,15 @@ CStoreTableRowCount(Oid relid, const char *filename)
* skip list, and returns number of rows for given stripe. * skip list, and returns number of rows for given stripe.
*/ */
static uint64 static uint64
StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata) StripeRowCount(Relation relation, StripeMetadata *stripeMetadata)
{ {
uint64 rowCount = 0; uint64 rowCount = 0;
StringInfo firstColumnSkipListBuffer = NULL; StringInfo firstColumnSkipListBuffer = NULL;
StripeFooter *stripeFooter = ReadStripeFooter(relid, stripeMetadata->id, StripeFooter *stripeFooter = ReadStripeFooter(relation->rd_id, stripeMetadata->id,
RelationColumnCount(relid)); RelationColumnCount(relation->rd_id));
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]); stripeFooter->skipListSizeArray[0]);
rowCount = DeserializeRowCount(firstColumnSkipListBuffer); rowCount = DeserializeRowCount(firstColumnSkipListBuffer);
@ -381,7 +361,7 @@ StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata)
* and only loads columns that are projected in the query. * and only loads columns that are projected in the query.
*/ */
static StripeBuffers * static StripeBuffers *
LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, TupleDesc tupleDescriptor, StripeFooter *stripeFooter, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList) List *projectedColumnList, List *whereClauseList)
{ {
@ -393,7 +373,7 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList); bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata, StripeSkipList *stripeSkipList = LoadStripeSkipList(relation, stripeMetadata,
stripeFooter, columnCount, stripeFooter, columnCount,
projectedColumnMask, projectedColumnMask,
tupleDescriptor); tupleDescriptor);
@ -423,7 +403,7 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex); Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
uint32 blockCount = selectedBlockSkipList->blockCount; uint32 blockCount = selectedBlockSkipList->blockCount;
ColumnBuffers *columnBuffers = LoadColumnBuffers(tableFile, blockSkipNode, ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode,
blockCount, blockCount,
existsFileOffset, existsFileOffset,
valueFileOffset, valueFileOffset,
@ -482,7 +462,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
* and lengths are retrieved from the column block skip node array. * and lengths are retrieved from the column block skip node array.
*/ */
static ColumnBuffers * static ColumnBuffers *
LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray, LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset, uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset,
Form_pg_attribute attributeForm) Form_pg_attribute attributeForm)
{ {
@ -505,7 +485,7 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
{ {
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset; uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset;
StringInfo rawExistsBuffer = ReadFromFile(tableFile, existsOffset, StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
blockSkipNode->existsLength); blockSkipNode->existsLength);
blockBuffersArray[blockIndex]->existsBuffer = rawExistsBuffer; blockBuffersArray[blockIndex]->existsBuffer = rawExistsBuffer;
@ -517,7 +497,7 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex]; ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
CompressionType compressionType = blockSkipNode->valueCompressionType; CompressionType compressionType = blockSkipNode->valueCompressionType;
uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset; uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset;
StringInfo rawValueBuffer = ReadFromFile(tableFile, valueOffset, StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
blockSkipNode->valueLength); blockSkipNode->valueLength);
blockBuffersArray[blockIndex]->valueBuffer = rawValueBuffer; blockBuffersArray[blockIndex]->valueBuffer = rawValueBuffer;
@ -533,7 +513,8 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
/* Reads the skip list for the given stripe. */ /* Reads the skip list for the given stripe. */
static StripeSkipList * static StripeSkipList *
LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, LoadStripeSkipList(Relation relation,
StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, uint32 columnCount, StripeFooter *stripeFooter, uint32 columnCount,
bool *projectedColumnMask, bool *projectedColumnMask,
TupleDesc tupleDescriptor) TupleDesc tupleDescriptor)
@ -547,7 +528,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 stripeColumnCount = stripeFooter->columnCount; uint32 stripeColumnCount = stripeFooter->columnCount;
/* deserialize block count */ /* deserialize block count */
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]); stripeFooter->skipListSizeArray[0]);
stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer); stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer);
@ -570,7 +551,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex); Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
StringInfo columnSkipListBuffer = StringInfo columnSkipListBuffer =
ReadFromFile(tableFile, currentColumnSkipListFileOffset, ReadFromSmgr(relation, currentColumnSkipListFileOffset,
columnSkipListSize); columnSkipListSize);
ColumnBlockSkipNode *columnSkipList = ColumnBlockSkipNode *columnSkipList =
DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval, DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval,
@ -1178,49 +1159,36 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor
return defaultValue; return defaultValue;
} }
/* Reads the given segment from the given file. */
static StringInfo static StringInfo
ReadFromFile(FILE *file, uint64 offset, uint32 size) ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
{ {
int fseekResult = 0; StringInfo resultBuffer = makeStringInfo();
int freadResult = 0; uint64 read = 0;
int fileError = 0;
StringInfo resultBuffer = makeStringInfo();
enlargeStringInfo(resultBuffer, size); enlargeStringInfo(resultBuffer, size);
resultBuffer->len = size; resultBuffer->len = size;
if (size == 0) while (read < size)
{ {
return resultBuffer; Buffer buffer;
} Page page;
PageHeader phdr;
uint32 to_read;
SmgrAddr addr = logical_to_smgr(offset + read);
errno = 0; buffer = ReadBuffer(rel, addr.blockno);
fseekResult = fseeko(file, offset, SEEK_SET); page = BufferGetPage(buffer);
if (fseekResult != 0) phdr = (PageHeader)page;
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not seek in file: %m")));
}
freadResult = fread(resultBuffer->data, size, 1, file); to_read = Min(size - read, phdr->pd_upper - addr.offset);
if (freadResult != 1) memcpy(resultBuffer->data + read, page + addr.offset, to_read);
{ ReleaseBuffer(buffer);
ereport(ERROR, (errmsg("could not read enough data from file"))); read += to_read;
}
fileError = ferror(file);
if (fileError != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not read file: %m")));
} }
return resultBuffer; return resultBuffer;
} }
/* /*
* ResetUncompressedBlockData iterates over deserialized column block data * ResetUncompressedBlockData iterates over deserialized column block data
* and sets valueBuffer field to empty buffer. This field is allocated in stripe * and sets valueBuffer field to empty buffer. This field is allocated in stripe

View File

@ -16,12 +16,12 @@
#include "postgres.h" #include "postgres.h"
#include <sys/stat.h>
#include "access/nbtree.h" #include "access/nbtree.h"
#include "catalog/pg_am.h" #include "catalog/pg_am.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/rel.h"
#include "cstore.h" #include "cstore.h"
#include "cstore_metadata_serialization.h" #include "cstore_metadata_serialization.h"
@ -51,8 +51,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
static void AppendStripeMetadata(TableMetadata *tableMetadata, static void AppendStripeMetadata(TableMetadata *tableMetadata,
StripeMetadata stripeMetadata); StripeMetadata stripeMetadata);
static void WriteToFile(FILE *file, void *data, uint32 dataLength);
static void SyncAndCloseFile(FILE *file);
static StringInfo CopyStringInfo(StringInfo sourceString); static StringInfo CopyStringInfo(StringInfo sourceString);
@ -65,12 +63,11 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
*/ */
TableWriteState * TableWriteState *
CStoreBeginWrite(Oid relationId, CStoreBeginWrite(Oid relationId,
const char *filename, CompressionType compressionType, CompressionType compressionType,
uint64 stripeMaxRowCount, uint32 blockRowCount, uint64 stripeMaxRowCount, uint32 blockRowCount,
TupleDesc tupleDescriptor) TupleDesc tupleDescriptor)
{ {
TableWriteState *writeState = NULL; TableWriteState *writeState = NULL;
FILE *tableFile = NULL;
TableMetadata *tableMetadata = NULL; TableMetadata *tableMetadata = NULL;
FmgrInfo **comparisonFunctionArray = NULL; FmgrInfo **comparisonFunctionArray = NULL;
MemoryContext stripeWriteContext = NULL; MemoryContext stripeWriteContext = NULL;
@ -81,14 +78,6 @@ CStoreBeginWrite(Oid relationId,
ColumnBlockData **blockData = NULL; ColumnBlockData **blockData = NULL;
uint64 currentStripeId = 0; uint64 currentStripeId = 0;
tableFile = AllocateFile(filename, "a+");
if (tableFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
filename)));
}
tableMetadata = ReadTableMetadata(relationId); tableMetadata = ReadTableMetadata(relationId);
/* /*
@ -99,7 +88,6 @@ CStoreBeginWrite(Oid relationId,
{ {
StripeMetadata *lastStripe = NULL; StripeMetadata *lastStripe = NULL;
uint64 lastStripeSize = 0; uint64 lastStripeSize = 0;
int fseekResult = 0;
lastStripe = llast(tableMetadata->stripeMetadataList); lastStripe = llast(tableMetadata->stripeMetadataList);
lastStripeSize += lastStripe->skipListLength; lastStripeSize += lastStripe->skipListLength;
@ -108,14 +96,6 @@ CStoreBeginWrite(Oid relationId,
currentFileOffset = lastStripe->fileOffset + lastStripeSize; currentFileOffset = lastStripe->fileOffset + lastStripeSize;
currentStripeId = lastStripe->id + 1; currentStripeId = lastStripe->id + 1;
errno = 0;
fseekResult = fseeko(tableFile, currentFileOffset, SEEK_SET);
if (fseekResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not seek in file \"%s\": %m", filename)));
}
} }
/* get comparison function pointers for each of the columns */ /* get comparison function pointers for each of the columns */
@ -154,7 +134,6 @@ CStoreBeginWrite(Oid relationId,
writeState = palloc0(sizeof(TableWriteState)); writeState = palloc0(sizeof(TableWriteState));
writeState->relationId = relationId; writeState->relationId = relationId;
writeState->tableFile = tableFile;
writeState->tableMetadata = tableMetadata; writeState->tableMetadata = tableMetadata;
writeState->compressionType = compressionType; writeState->compressionType = compressionType;
writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->stripeMaxRowCount = stripeMaxRowCount;
@ -312,8 +291,6 @@ CStoreEndWrite(TableWriteState *writeState)
AppendStripeMetadata(writeState->tableMetadata, stripeMetadata); AppendStripeMetadata(writeState->tableMetadata, stripeMetadata);
} }
SyncAndCloseFile(writeState->tableFile);
MemoryContextDelete(writeState->stripeWriteContext); MemoryContextDelete(writeState->stripeWriteContext);
list_free_deep(writeState->tableMetadata->stripeMetadataList); list_free_deep(writeState->tableMetadata->stripeMetadataList);
pfree(writeState->comparisonFunctionArray); pfree(writeState->comparisonFunctionArray);
@ -391,6 +368,57 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
return stripeSkipList; return stripeSkipList;
} }
static void
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
{
uint64 logicalOffset = writeState->currentFileOffset;
uint64 remaining = dataLength;
Relation rel = writeState->relation;
Buffer buffer;
while (remaining > 0)
{
SmgrAddr addr = logical_to_smgr(logicalOffset);
BlockNumber nblocks;
Page page;
PageHeader phdr;
uint64 to_write;
RelationOpenSmgr(rel);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
while (addr.blockno >= nblocks)
{
Buffer buffer = ReadBuffer(rel, P_NEW);
ReleaseBuffer(buffer);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
}
RelationCloseSmgr(rel);
buffer = ReadBuffer(rel, addr.blockno);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buffer);
phdr = (PageHeader) page;
if (PageIsNew(page))
PageInit(page, BLCKSZ, 0);
/* always appending */
Assert(phdr->pd_lower == addr.offset);
to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining);
memcpy(page + phdr->pd_lower, data, to_write);
phdr->pd_lower += to_write;
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
data += to_write;
remaining -= to_write;
logicalOffset += to_write;
}
}
/* /*
* FlushStripe flushes current stripe data into the file. The function first ensures * FlushStripe flushes current stripe data into the file. The function first ensures
@ -409,7 +437,6 @@ FlushStripe(TableWriteState *writeState)
uint32 columnIndex = 0; uint32 columnIndex = 0;
uint32 blockIndex = 0; uint32 blockIndex = 0;
TableMetadata *tableMetadata = writeState->tableMetadata; TableMetadata *tableMetadata = writeState->tableMetadata;
FILE *tableFile = writeState->tableFile;
StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeBuffers *stripeBuffers = writeState->stripeBuffers;
StripeSkipList *stripeSkipList = writeState->stripeSkipList; StripeSkipList *stripeSkipList = writeState->stripeSkipList;
ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray; ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray;
@ -419,6 +446,7 @@ FlushStripe(TableWriteState *writeState)
uint32 blockRowCount = tableMetadata->blockRowCount; uint32 blockRowCount = tableMetadata->blockRowCount;
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
uint64 initialFileOffset = writeState->currentFileOffset;
/* /*
* check if the last block needs serialization , the last block was not serialized * check if the last block needs serialization , the last block was not serialized
@ -479,7 +507,8 @@ FlushStripe(TableWriteState *writeState)
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
StringInfo skipListBuffer = skipListBufferArray[columnIndex]; StringInfo skipListBuffer = skipListBufferArray[columnIndex];
WriteToFile(tableFile, skipListBuffer->data, skipListBuffer->len); WriteToSmgr(writeState, skipListBuffer->data, skipListBuffer->len);
writeState->currentFileOffset += skipListBuffer->len;
} }
/* then, we flush the data buffers */ /* then, we flush the data buffers */
@ -494,7 +523,8 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
StringInfo existsBuffer = blockBuffers->existsBuffer; StringInfo existsBuffer = blockBuffers->existsBuffer;
WriteToFile(tableFile, existsBuffer->data, existsBuffer->len); WriteToSmgr(writeState, existsBuffer->data, existsBuffer->len);
writeState->currentFileOffset += existsBuffer->len;
} }
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++) for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
@ -503,7 +533,8 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
StringInfo valueBuffer = blockBuffers->valueBuffer; StringInfo valueBuffer = blockBuffers->valueBuffer;
WriteToFile(tableFile, valueBuffer->data, valueBuffer->len); WriteToSmgr(writeState, valueBuffer->data, valueBuffer->len);
writeState->currentFileOffset += valueBuffer->len;
} }
} }
@ -520,16 +551,12 @@ FlushStripe(TableWriteState *writeState)
dataLength += stripeFooter->valueSizeArray[columnIndex]; dataLength += stripeFooter->valueSizeArray[columnIndex];
} }
stripeMetadata.fileOffset = writeState->currentFileOffset; stripeMetadata.fileOffset = initialFileOffset;
stripeMetadata.skipListLength = skipListLength; stripeMetadata.skipListLength = skipListLength;
stripeMetadata.dataLength = dataLength; stripeMetadata.dataLength = dataLength;
stripeMetadata.footerLength = 0; stripeMetadata.footerLength = 0;
stripeMetadata.id = writeState->currentStripeId; stripeMetadata.id = writeState->currentStripeId;
/* advance current file offset */
writeState->currentFileOffset += skipListLength;
writeState->currentFileOffset += dataLength;
return stripeMetadata; return stripeMetadata;
} }
@ -834,76 +861,6 @@ AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata
stripeMetadataCopy); stripeMetadataCopy);
} }
/* Writes the given data to the given file pointer and checks for errors. */
static void
WriteToFile(FILE *file, void *data, uint32 dataLength)
{
int writeResult = 0;
int errorResult = 0;
if (dataLength == 0)
{
return;
}
errno = 0;
writeResult = fwrite(data, dataLength, 1, file);
if (writeResult != 1)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not write file: %m")));
}
errorResult = ferror(file);
if (errorResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("error in file: %m")));
}
}
/* Flushes, syncs, and closes the given file pointer and checks for errors. */
static void
SyncAndCloseFile(FILE *file)
{
int flushResult = 0;
int syncResult = 0;
int errorResult = 0;
int freeResult = 0;
errno = 0;
flushResult = fflush(file);
if (flushResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not flush file: %m")));
}
syncResult = pg_fsync(fileno(file));
if (syncResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not sync file: %m")));
}
errorResult = ferror(file);
if (errorResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("error in file: %m")));
}
freeResult = FreeFile(file);
if (freeResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not close file: %m")));
}
}
/* /*
* CopyStringInfo creates a deep copy of given source string allocating only needed * CopyStringInfo creates a deep copy of given source string allocating only needed
* amount of memory. * amount of memory.

View File

@ -12,17 +12,6 @@
-- 'postgres' directory is excluded from comparison to have the same result. -- 'postgres' directory is excluded from comparison to have the same result.
-- store postgres database oid -- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
-- Check that files for the automatically managed table exist in the
-- cstore_fdw/{databaseoid} directory.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
count
-------
2
(1 row)
-- DROP cstore_fdw tables -- DROP cstore_fdw tables
DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant;
DROP FOREIGN TABLE contestant_compressed; DROP FOREIGN TABLE contestant_compressed;
@ -31,17 +20,6 @@ CREATE SCHEMA test_schema;
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
DROP SCHEMA test_schema CASCADE; DROP SCHEMA test_schema CASCADE;
NOTICE: drop cascades to foreign table test_schema.test_table NOTICE: drop cascades to foreign table test_schema.test_table
-- Check that the files have been deleted and the directory is empty after the
-- DROP table command.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
count
-------
0
(1 row)
SELECT current_database() datname \gset SELECT current_database() datname \gset
CREATE DATABASE db_to_drop; CREATE DATABASE db_to_drop;
\c db_to_drop \c db_to_drop
@ -49,49 +27,14 @@ CREATE EXTENSION cstore_fdw;
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw; CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
-- should see 2 files, data and footer file for single table
SELECT count(*) FROM pg_ls_dir('cstore_fdw/' || :databaseoid);
count
-------
2
(1 row)
-- should see 2 directories 1 for each database, excluding postgres database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
count
-------
2
(1 row)
DROP EXTENSION cstore_fdw CASCADE; DROP EXTENSION cstore_fdw CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to server cstore_server DETAIL: drop cascades to server cstore_server
drop cascades to foreign table test_table drop cascades to foreign table test_table
-- should only see 1 directory here
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
count
-------
1
(1 row)
-- test database drop -- test database drop
CREATE EXTENSION cstore_fdw; CREATE EXTENSION cstore_fdw;
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw; CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
-- should see 2 directories 1 for each database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
count
-------
2
(1 row)
\c :datname \c :datname
DROP DATABASE db_to_drop; DROP DATABASE db_to_drop;
-- should only see 1 directory for the default database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
count
-------
1
(1 row)

View File

@ -9,17 +9,6 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
t t
(1 row) (1 row)
-- Check that files for the automatically managed table exist in the
-- cstore_fdw/{databaseoid} directory.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
count
-------
0
(1 row)
-- CREATE a cstore_fdw table, fill with some data -- -- CREATE a cstore_fdw table, fill with some data --
CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server; CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server;
CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server; CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server;
@ -75,16 +64,6 @@ SELECT cstore_table_size('cstore_truncate_test_compressed');
0 0
(1 row) (1 row)
-- make sure data files still present
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
count
-------
3
(1 row)
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a; INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a; INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a;
INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a; INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a;
@ -250,13 +229,3 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
DROP SCHEMA truncate_schema CASCADE; DROP SCHEMA truncate_schema CASCADE;
NOTICE: drop cascades to foreign table truncate_schema.truncate_tbl NOTICE: drop cascades to foreign table truncate_schema.truncate_tbl
DROP USER truncate_user; DROP USER truncate_user;
-- verify files are removed
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
count
-------
0
(1 row)

View File

@ -30,8 +30,7 @@ $$ LANGUAGE PLPGSQL;
-- Create and load data -- Create and load data
CREATE FOREIGN TABLE test_block_filtering (a int) CREATE FOREIGN TABLE test_block_filtering (a int)
SERVER cstore_server SERVER cstore_server
OPTIONS(filename '@abs_srcdir@/data/block_filtering.cstore', OPTIONS(block_row_count '1000', stripe_row_count '2000');
block_row_count '1000', stripe_row_count '2000');
COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV; COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV;
@ -60,8 +59,7 @@ SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering WHERE a BET
-- Verify that we are fine with collations which use a different alphabet order -- Verify that we are fine with collations which use a different alphabet order
CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK") CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK")
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/collation_block_filtering.cstore');
COPY collation_block_filtering_test FROM STDIN; COPY collation_block_filtering_test FROM STDIN;
A A
Å Å

View File

@ -3,8 +3,7 @@
-- --
CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT, CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[]) percentile FLOAT, country CHAR(3), achievements TEXT[])
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/test_contestant.cstore');
-- load table data from file -- load table data from file
COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;

View File

@ -12,30 +12,24 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
-- Validator tests -- Validator tests
CREATE FOREIGN TABLE test_validator_invalid_option () CREATE FOREIGN TABLE test_validator_invalid_option ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', bad_option_name '1'); -- ERROR OPTIONS(bad_option_name '1'); -- ERROR
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count () CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', stripe_row_count '0'); -- ERROR OPTIONS(stripe_row_count '0'); -- ERROR
CREATE FOREIGN TABLE test_validator_invalid_block_row_count () CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', block_row_count '0'); -- ERROR OPTIONS(block_row_count '0'); -- ERROR
CREATE FOREIGN TABLE test_validator_invalid_compression_type () CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', compression 'invalid_compression'); -- ERROR OPTIONS(compression 'invalid_compression'); -- ERROR
-- Invalid file path test
CREATE FOREIGN TABLE test_invalid_file_path ()
SERVER cstore_server
OPTIONS(filename 'bad_directory_path/bad_file_path'); --ERROR
-- Create uncompressed table -- Create uncompressed table
CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT, CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[]) percentile FLOAT, country CHAR(3), achievements TEXT[])
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/contestant.cstore');
-- Create compressed table with automatically determined file path -- Create compressed table with automatically determined file path

View File

@ -11,8 +11,7 @@ SET intervalstyle TO 'POSTGRES_VERBOSE';
-- Test array types -- Test array types
CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[], CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[],
text_array text[]) SERVER cstore_server text_array text[]) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/array_types.cstore');
COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV; COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV;
@ -22,8 +21,7 @@ SELECT * FROM test_array_types;
-- Test date/time types -- Test date/time types
CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp, CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp,
timestamp_with_timezone timestamp with time zone, date date, time time, timestamp_with_timezone timestamp with time zone, date date, time time,
interval interval) SERVER cstore_server interval interval) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/datetime_types.cstore');
COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV; COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV;
@ -35,8 +33,7 @@ CREATE TYPE enum_type AS ENUM ('a', 'b', 'c');
CREATE TYPE composite_type AS (a int, b text); CREATE TYPE composite_type AS (a int, b text);
CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type, CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type,
composite composite_type) SERVER cstore_server composite composite_type) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/enum_and_composite_types.cstore');
COPY test_enum_and_composite_types FROM COPY test_enum_and_composite_types FROM
'@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV; '@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV;
@ -46,8 +43,7 @@ SELECT * FROM test_enum_and_composite_types;
-- Test range types -- Test range types
CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range, CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range,
numrange numrange, tsrange tsrange) SERVER cstore_server numrange numrange, tsrange tsrange) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/range_types.cstore');
COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV; COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV;
@ -56,8 +52,7 @@ SELECT * FROM test_range_types;
-- Test other types -- Test other types
CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money, CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money,
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/other_types.cstore');
COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV; COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV;
@ -66,8 +61,7 @@ SELECT * FROM test_other_types;
-- Test null values -- Test null values
CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type) CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type)
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/null_values.cstore');
COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV; COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV;

View File

@ -26,8 +26,7 @@ $$ LANGUAGE PLPGSQL;
-- Create and load data -- Create and load data
CREATE FOREIGN TABLE test_block_filtering (a int) CREATE FOREIGN TABLE test_block_filtering (a int)
SERVER cstore_server SERVER cstore_server
OPTIONS(filename '@abs_srcdir@/data/block_filtering.cstore', OPTIONS(block_row_count '1000', stripe_row_count '2000');
block_row_count '1000', stripe_row_count '2000');
COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV; COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV;
-- Verify that filtered_row_count is less than 1000 for the following queries -- Verify that filtered_row_count is less than 1000 for the following queries
SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering'); SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering');
@ -107,8 +106,7 @@ SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering WHERE a BET
-- Verify that we are fine with collations which use a different alphabet order -- Verify that we are fine with collations which use a different alphabet order
CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK") CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK")
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/collation_block_filtering.cstore');
COPY collation_block_filtering_test FROM STDIN; COPY collation_block_filtering_test FROM STDIN;
SELECT * FROM collation_block_filtering_test WHERE A > 'B'; SELECT * FROM collation_block_filtering_test WHERE A > 'B';
a a

View File

@ -3,8 +3,7 @@
-- --
CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT, CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[]) percentile FLOAT, country CHAR(3), achievements TEXT[])
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/test_contestant.cstore');
-- load table data from file -- load table data from file
COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
-- export using COPY table TO ... -- export using COPY table TO ...

View File

@ -7,34 +7,28 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
-- Validator tests -- Validator tests
CREATE FOREIGN TABLE test_validator_invalid_option () CREATE FOREIGN TABLE test_validator_invalid_option ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', bad_option_name '1'); -- ERROR OPTIONS(bad_option_name '1'); -- ERROR
ERROR: invalid option "bad_option_name" ERROR: invalid option "bad_option_name"
HINT: Valid options in this context are: filename, compression, stripe_row_count, block_row_count HINT: Valid options in this context are: compression, stripe_row_count, block_row_count
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count () CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', stripe_row_count '0'); -- ERROR OPTIONS(stripe_row_count '0'); -- ERROR
ERROR: invalid stripe row count ERROR: invalid stripe row count
HINT: Stripe row count must be an integer between 1000 and 10000000 HINT: Stripe row count must be an integer between 1000 and 10000000
CREATE FOREIGN TABLE test_validator_invalid_block_row_count () CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', block_row_count '0'); -- ERROR OPTIONS(block_row_count '0'); -- ERROR
ERROR: invalid block row count ERROR: invalid block row count
HINT: Block row count must be an integer between 1000 and 100000 HINT: Block row count must be an integer between 1000 and 100000
CREATE FOREIGN TABLE test_validator_invalid_compression_type () CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
SERVER cstore_server SERVER cstore_server
OPTIONS(filename 'data.cstore', compression 'invalid_compression'); -- ERROR OPTIONS(compression 'invalid_compression'); -- ERROR
ERROR: invalid compression type ERROR: invalid compression type
HINT: Valid options are: none, pglz HINT: Valid options are: none, pglz
-- Invalid file path test
CREATE FOREIGN TABLE test_invalid_file_path ()
SERVER cstore_server
OPTIONS(filename 'bad_directory_path/bad_file_path'); --ERROR
ERROR: could not open file "bad_directory_path/bad_file_path" for writing: No such file or directory
-- Create uncompressed table -- Create uncompressed table
CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT, CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[]) percentile FLOAT, country CHAR(3), achievements TEXT[])
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/contestant.cstore');
-- Create compressed table with automatically determined file path -- Create compressed table with automatically determined file path
CREATE FOREIGN TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT, CREATE FOREIGN TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[]) percentile FLOAT, country CHAR(3), achievements TEXT[])

View File

@ -7,8 +7,7 @@ SET timezone to 'GMT';
SET intervalstyle TO 'POSTGRES_VERBOSE'; SET intervalstyle TO 'POSTGRES_VERBOSE';
-- Test array types -- Test array types
CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[], CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[],
text_array text[]) SERVER cstore_server text_array text[]) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/array_types.cstore');
COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV; COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV;
SELECT * FROM test_array_types; SELECT * FROM test_array_types;
int_array | bigint_array | text_array int_array | bigint_array | text_array
@ -21,8 +20,7 @@ SELECT * FROM test_array_types;
-- Test date/time types -- Test date/time types
CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp, CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp,
timestamp_with_timezone timestamp with time zone, date date, time time, timestamp_with_timezone timestamp with time zone, date date, time time,
interval interval) SERVER cstore_server interval interval) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/datetime_types.cstore');
COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV; COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV;
SELECT * FROM test_datetime_types; SELECT * FROM test_datetime_types;
timestamp | timestamp_with_timezone | date | time | interval timestamp | timestamp_with_timezone | date | time | interval
@ -35,8 +33,7 @@ SELECT * FROM test_datetime_types;
CREATE TYPE enum_type AS ENUM ('a', 'b', 'c'); CREATE TYPE enum_type AS ENUM ('a', 'b', 'c');
CREATE TYPE composite_type AS (a int, b text); CREATE TYPE composite_type AS (a int, b text);
CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type, CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type,
composite composite_type) SERVER cstore_server composite composite_type) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/enum_and_composite_types.cstore');
COPY test_enum_and_composite_types FROM COPY test_enum_and_composite_types FROM
'@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV; '@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV;
SELECT * FROM test_enum_and_composite_types; SELECT * FROM test_enum_and_composite_types;
@ -48,8 +45,7 @@ SELECT * FROM test_enum_and_composite_types;
-- Test range types -- Test range types
CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range, CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range,
numrange numrange, tsrange tsrange) SERVER cstore_server numrange numrange, tsrange tsrange) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/range_types.cstore');
COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV; COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV;
SELECT * FROM test_range_types; SELECT * FROM test_range_types;
int4range | int8range | numrange | tsrange int4range | int8range | numrange | tsrange
@ -60,8 +56,7 @@ SELECT * FROM test_range_types;
-- Test other types -- Test other types
CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money, CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money,
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/other_types.cstore');
COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV; COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV;
SELECT * FROM test_other_types; SELECT * FROM test_other_types;
bool | bytea | money | inet | bitstring | uuid | json bool | bytea | money | inet | bitstring | uuid | json
@ -72,8 +67,7 @@ SELECT * FROM test_other_types;
-- Test null values -- Test null values
CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type) CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type)
SERVER cstore_server SERVER cstore_server;
OPTIONS(filename '@abs_srcdir@/data/null_values.cstore');
COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV; COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV;
SELECT * FROM test_null_values; SELECT * FROM test_null_values;
a | b | c a | b | c

View File

@ -15,13 +15,6 @@
-- store postgres database oid -- store postgres database oid
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
-- Check that files for the automatically managed table exist in the
-- cstore_fdw/{databaseoid} directory.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
-- DROP cstore_fdw tables -- DROP cstore_fdw tables
DROP FOREIGN TABLE contestant; DROP FOREIGN TABLE contestant;
DROP FOREIGN TABLE contestant_compressed; DROP FOREIGN TABLE contestant_compressed;
@ -31,13 +24,6 @@ CREATE SCHEMA test_schema;
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
DROP SCHEMA test_schema CASCADE; DROP SCHEMA test_schema CASCADE;
-- Check that the files have been deleted and the directory is empty after the
-- DROP table command.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
SELECT current_database() datname \gset SELECT current_database() datname \gset
CREATE DATABASE db_to_drop; CREATE DATABASE db_to_drop;
@ -47,17 +33,9 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
-- should see 2 files, data and footer file for single table
SELECT count(*) FROM pg_ls_dir('cstore_fdw/' || :databaseoid);
-- should see 2 directories 1 for each database, excluding postgres database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
DROP EXTENSION cstore_fdw CASCADE; DROP EXTENSION cstore_fdw CASCADE;
-- should only see 1 directory here
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
-- test database drop -- test database drop
CREATE EXTENSION cstore_fdw; CREATE EXTENSION cstore_fdw;
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw; CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
@ -65,12 +43,6 @@ SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server; CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
-- should see 2 directories 1 for each database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
\c :datname \c :datname
DROP DATABASE db_to_drop; DROP DATABASE db_to_drop;
-- should only see 1 directory for the default database
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;

View File

@ -6,13 +6,6 @@
SHOW server_version \gset SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
-- Check that files for the automatically managed table exist in the
-- cstore_fdw/{databaseoid} directory.
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
-- CREATE a cstore_fdw table, fill with some data -- -- CREATE a cstore_fdw table, fill with some data --
CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server; CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server;
CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server; CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server;
@ -39,12 +32,6 @@ SELECT count(*) FROM cstore_truncate_test_compressed;
SELECT cstore_table_size('cstore_truncate_test_compressed'); SELECT cstore_table_size('cstore_truncate_test_compressed');
-- make sure data files still present
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a; INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a; INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a;
INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a; INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a;
@ -127,9 +114,3 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
-- cleanup -- cleanup
DROP SCHEMA truncate_schema CASCADE; DROP SCHEMA truncate_schema CASCADE;
DROP USER truncate_user; DROP USER truncate_user;
-- verify files are removed
SELECT count(*) FROM (
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
) AS q1) AS q2;