mirror of https://github.com/citusdata/citus.git
Replace file access with Smgr
parent
a2f7eadeb9
commit
dee408248c
132
cstore.c
132
cstore.c
|
@ -21,9 +21,6 @@
|
|||
|
||||
#include "cstore.h"
|
||||
|
||||
static void CreateDirectory(StringInfo directoryName);
|
||||
static bool DirectoryExists(StringInfo directoryName);
|
||||
|
||||
/* ParseCompressionType converts a string to a compression type. */
|
||||
CompressionType
|
||||
ParseCompressionType(const char *compressionTypeString)
|
||||
|
@ -44,80 +41,6 @@ ParseCompressionType(const char *compressionTypeString)
|
|||
}
|
||||
|
||||
|
||||
/* CreateDirectory creates a new directory with the given directory name. */
|
||||
static void
|
||||
CreateDirectory(StringInfo directoryName)
|
||||
{
|
||||
int makeOK = mkdir(directoryName->data, S_IRWXU);
|
||||
if (makeOK != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not create directory \"%s\": %m",
|
||||
directoryName->data)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* DirectoryExists checks if a directory exists for the given directory name. */
|
||||
static bool
|
||||
DirectoryExists(StringInfo directoryName)
|
||||
{
|
||||
bool directoryExists = true;
|
||||
struct stat directoryStat;
|
||||
|
||||
int statOK = stat(directoryName->data, &directoryStat);
|
||||
if (statOK == 0)
|
||||
{
|
||||
/* file already exists; check that it is a directory */
|
||||
if (!S_ISDIR(directoryStat.st_mode))
|
||||
{
|
||||
ereport(ERROR, (errmsg("\"%s\" is not a directory", directoryName->data),
|
||||
errhint("You need to remove or rename the file \"%s\".",
|
||||
directoryName->data)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
{
|
||||
directoryExists = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not stat directory \"%s\": %m",
|
||||
directoryName->data)));
|
||||
}
|
||||
}
|
||||
|
||||
return directoryExists;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoveCStoreDatabaseDirectory removes CStore directory previously
|
||||
* created for this database.
|
||||
* However it does not remove 'cstore_fdw' directory even if there
|
||||
* are no other databases left.
|
||||
*/
|
||||
void
|
||||
RemoveCStoreDatabaseDirectory(Oid databaseOid)
|
||||
{
|
||||
StringInfo cstoreDirectoryPath = makeStringInfo();
|
||||
StringInfo cstoreDatabaseDirectoryPath = makeStringInfo();
|
||||
|
||||
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
|
||||
|
||||
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
|
||||
CSTORE_FDW_NAME, databaseOid);
|
||||
|
||||
if (DirectoryExists(cstoreDatabaseDirectoryPath))
|
||||
{
|
||||
rmtree(cstoreDatabaseDirectoryPath->data, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeCStoreTableFile creates data and footer file for a cstore table.
|
||||
* The function assumes data and footer files do not exist, therefore
|
||||
|
@ -136,62 +59,9 @@ InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *csto
|
|||
* Initialize state to write to the cstore file. This creates an
|
||||
* empty data file and a valid footer file for the table.
|
||||
*/
|
||||
writeState = CStoreBeginWrite(relationId, cstoreOptions->filename,
|
||||
writeState = CStoreBeginWrite(relationId,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount, tupleDescriptor);
|
||||
CStoreEndWrite(writeState);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateCStoreDatabaseDirectory creates the directory (and parent directories,
|
||||
* if needed) used to store automatically managed cstore_fdw files. The path to
|
||||
* the directory is $PGDATA/cstore_fdw/{databaseOid}.
|
||||
*/
|
||||
void
|
||||
CreateCStoreDatabaseDirectory(Oid databaseOid)
|
||||
{
|
||||
bool cstoreDirectoryExists = false;
|
||||
bool databaseDirectoryExists = false;
|
||||
StringInfo cstoreDatabaseDirectoryPath = NULL;
|
||||
|
||||
StringInfo cstoreDirectoryPath = makeStringInfo();
|
||||
appendStringInfo(cstoreDirectoryPath, "%s/%s", DataDir, CSTORE_FDW_NAME);
|
||||
|
||||
cstoreDirectoryExists = DirectoryExists(cstoreDirectoryPath);
|
||||
if (!cstoreDirectoryExists)
|
||||
{
|
||||
CreateDirectory(cstoreDirectoryPath);
|
||||
}
|
||||
|
||||
cstoreDatabaseDirectoryPath = makeStringInfo();
|
||||
appendStringInfo(cstoreDatabaseDirectoryPath, "%s/%s/%u", DataDir,
|
||||
CSTORE_FDW_NAME, databaseOid);
|
||||
|
||||
databaseDirectoryExists = DirectoryExists(cstoreDatabaseDirectoryPath);
|
||||
if (!databaseDirectoryExists)
|
||||
{
|
||||
CreateDirectory(cstoreDatabaseDirectoryPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeleteCStoreTableFiles deletes the data and footer files for a cstore table
|
||||
* whose data filename is given.
|
||||
*/
|
||||
void
|
||||
DeleteCStoreTableFiles(char *filename)
|
||||
{
|
||||
int dataFileRemoved = 0;
|
||||
|
||||
/* delete the data file */
|
||||
dataFileRemoved = unlink(filename);
|
||||
if (dataFileRemoved != 0)
|
||||
{
|
||||
ereport(WARNING, (errcode_for_file_access(),
|
||||
errmsg("could not delete file \"%s\": %m",
|
||||
filename)));
|
||||
}
|
||||
}
|
||||
|
|
32
cstore.h
32
cstore.h
|
@ -16,10 +16,10 @@
|
|||
|
||||
#include "fmgr.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "utils/relcache.h"
|
||||
|
||||
/* Defines for valid option names */
|
||||
#define OPTION_NAME_FILENAME "filename"
|
||||
#define OPTION_NAME_COMPRESSION_TYPE "compression"
|
||||
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count"
|
||||
#define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count"
|
||||
|
@ -68,7 +68,6 @@ typedef enum
|
|||
*/
|
||||
typedef struct CStoreOptions
|
||||
{
|
||||
char *filename;
|
||||
CompressionType compressionType;
|
||||
uint64 stripeRowCount;
|
||||
uint32 blockRowCount;
|
||||
|
@ -203,10 +202,9 @@ typedef struct TableReadState
|
|||
{
|
||||
Oid relationId;
|
||||
|
||||
FILE *tableFile;
|
||||
TableMetadata *tableMetadata;
|
||||
TupleDesc tupleDescriptor;
|
||||
|
||||
Relation relation;
|
||||
/*
|
||||
* List of Var pointers for columns in the query. We use this both for
|
||||
* getting vector of projected columns, and also when we want to build
|
||||
|
@ -228,7 +226,6 @@ typedef struct TableReadState
|
|||
typedef struct TableWriteState
|
||||
{
|
||||
Oid relationId;
|
||||
FILE *tableFile;
|
||||
TableMetadata *tableMetadata;
|
||||
CompressionType compressionType;
|
||||
TupleDesc tupleDescriptor;
|
||||
|
@ -257,11 +254,9 @@ extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
|
|||
CStoreOptions *cstoreOptions);
|
||||
extern void CreateCStoreDatabaseDirectory(Oid databaseOid);
|
||||
extern void RemoveCStoreDatabaseDirectory(Oid databaseOid);
|
||||
extern void DeleteCStoreTableFiles(char *filename);
|
||||
|
||||
/* Function declarations for writing to a cstore file */
|
||||
extern TableWriteState * CStoreBeginWrite(Oid relationId,
|
||||
const char *filename,
|
||||
CompressionType compressionType,
|
||||
uint64 stripeMaxRowCount,
|
||||
uint32 blockRowCount,
|
||||
|
@ -271,7 +266,7 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
|
|||
extern void CStoreEndWrite(TableWriteState *state);
|
||||
|
||||
/* Function declarations for reading from a cstore file */
|
||||
extern TableReadState * CStoreBeginRead(Oid relationId, const char *filename,
|
||||
extern TableReadState * CStoreBeginRead(Oid relationId,
|
||||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *qualConditions);
|
||||
extern bool CStoreReadFinished(TableReadState *state);
|
||||
|
@ -286,7 +281,7 @@ extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *co
|
|||
uint32 blockRowCount);
|
||||
extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
|
||||
uint32 columnCount);
|
||||
extern uint64 CStoreTableRowCount(Oid relid, const char *filename);
|
||||
extern uint64 CStoreTableRowCount(Relation relation);
|
||||
extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
||||
CompressionType compressionType);
|
||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
||||
|
@ -294,8 +289,27 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio
|
|||
/* cstore_metadata_tables.c */
|
||||
extern void SaveStripeFooter(Oid relid, uint64 stripe, StripeFooter *footer);
|
||||
extern StripeFooter * ReadStripeFooter(Oid relid, uint64 stripe, int relationColumnCount);
|
||||
|
||||
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
||||
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
||||
extern TableMetadata * ReadTableMetadata(Oid relid);
|
||||
|
||||
typedef struct SmgrAddr
|
||||
{
|
||||
BlockNumber blockno;
|
||||
uint32 offset;
|
||||
} SmgrAddr;
|
||||
|
||||
static inline SmgrAddr
|
||||
logical_to_smgr(uint64 logicalOffset)
|
||||
{
|
||||
uint64 bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
|
||||
SmgrAddr addr;
|
||||
|
||||
addr.blockno = logicalOffset / bytes_per_page;
|
||||
addr.offset = logicalOffset % bytes_per_page;
|
||||
|
||||
return addr;
|
||||
}
|
||||
|
||||
#endif /* CSTORE_H */
|
||||
|
|
268
cstore_fdw.c
268
cstore_fdw.c
|
@ -100,7 +100,6 @@ static const uint32 ValidOptionCount = 4;
|
|||
static const CStoreValidOption ValidOptionArray[] =
|
||||
{
|
||||
/* foreign table options */
|
||||
{ OPTION_NAME_FILENAME, ForeignTableRelationId },
|
||||
{ OPTION_NAME_COMPRESSION_TYPE, ForeignTableRelationId },
|
||||
{ OPTION_NAME_STRIPE_ROW_COUNT, ForeignTableRelationId },
|
||||
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
|
||||
|
@ -130,7 +129,7 @@ static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
|
|||
static List * DroppedCStoreRelidList(DropStmt *dropStatement);
|
||||
static List * FindCStoreTables(List *tableList);
|
||||
static List * OpenRelationsForTruncate(List *cstoreTableList);
|
||||
static void InitializeRelFileNode(Relation relation);
|
||||
static void InitializeRelFileNode(Relation relation, bool force);
|
||||
static void TruncateCStoreTables(List *cstoreRelationList);
|
||||
static bool CStoreTable(Oid relationId);
|
||||
static bool CStoreServer(ForeignServer *server);
|
||||
|
@ -140,10 +139,9 @@ static StringInfo OptionNamesString(Oid currentContextId);
|
|||
static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts);
|
||||
static CStoreOptions * CStoreGetOptions(Oid foreignTableId);
|
||||
static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName);
|
||||
static void ValidateForeignTableOptions(char *filename, char *compressionTypeString,
|
||||
static void ValidateForeignTableOptions(char *compressionTypeString,
|
||||
char *stripeRowCountString,
|
||||
char *blockRowCountString);
|
||||
static char * CStoreDefaultFilePath(Oid foreignTableId);
|
||||
static void CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
|
||||
Oid foreignTableId);
|
||||
static void CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
|
||||
|
@ -158,8 +156,8 @@ static ForeignScan * CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel
|
|||
Oid foreignTableId, ForeignPath *bestPath,
|
||||
List *targetList, List *scanClauses);
|
||||
#endif
|
||||
static double TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename);
|
||||
static BlockNumber PageCount(const char *filename);
|
||||
static double TupleCountEstimate(Relation relation, RelOptInfo *baserel);
|
||||
static BlockNumber PageCount(Relation relation);
|
||||
static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId);
|
||||
static void CStoreExplainForeignScan(ForeignScanState *scanState,
|
||||
ExplainState *explainState);
|
||||
|
@ -250,17 +248,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
|||
triggerData = (EventTriggerData *) fcinfo->context;
|
||||
parseTree = triggerData->parsetree;
|
||||
|
||||
if (nodeTag(parseTree) == T_CreateForeignServerStmt)
|
||||
{
|
||||
CreateForeignServerStmt *serverStatement = (CreateForeignServerStmt *) parseTree;
|
||||
|
||||
char *foreignWrapperName = serverStatement->fdwname;
|
||||
if (strncmp(foreignWrapperName, CSTORE_FDW_NAME, NAMEDATALEN) == 0)
|
||||
{
|
||||
CreateCStoreDatabaseDirectory(MyDatabaseId);
|
||||
}
|
||||
}
|
||||
else if (nodeTag(parseTree) == T_CreateForeignTableStmt)
|
||||
if (nodeTag(parseTree) == T_CreateForeignTableStmt)
|
||||
{
|
||||
CreateForeignTableStmt *createStatement = (CreateForeignTableStmt *) parseTree;
|
||||
char *serverName = createStatement->servername;
|
||||
|
@ -280,8 +268,6 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
|||
* We have no chance to hook into server creation to create data
|
||||
* directory for it during database creation time.
|
||||
*/
|
||||
CreateCStoreDatabaseDirectory(MyDatabaseId);
|
||||
|
||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
||||
heap_close(relation, AccessExclusiveLock);
|
||||
}
|
||||
|
@ -361,16 +347,10 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
|||
|
||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
||||
destReceiver, completionTag);
|
||||
|
||||
if (removeCStoreDirectory)
|
||||
{
|
||||
RemoveCStoreDatabaseDirectory(MyDatabaseId);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
|
||||
List *dropFiles = NIL;
|
||||
ListCell *lc = NULL;
|
||||
|
||||
/* drop smgr storage */
|
||||
|
@ -378,35 +358,14 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
|||
{
|
||||
Oid relid = lfirst_oid(lc);
|
||||
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(relid);
|
||||
char *defaultfilename = CStoreDefaultFilePath(relid);
|
||||
|
||||
RelationOpenSmgr(relation);
|
||||
RelationDropStorage(relation);
|
||||
heap_close(relation, AccessExclusiveLock);
|
||||
|
||||
/*
|
||||
* Skip files that are placed in default location, they are handled
|
||||
* by sql drop trigger. Both paths are generated by code, use
|
||||
* of strcmp is safe here.
|
||||
*/
|
||||
if (strcmp(defaultfilename, cstoreOptions->filename) == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
dropFiles = lappend(dropFiles, cstoreOptions->filename);
|
||||
}
|
||||
|
||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
||||
destReceiver, completionTag);
|
||||
|
||||
/* drop files */
|
||||
foreach(lc, dropFiles)
|
||||
{
|
||||
char *filename = lfirst(lc);
|
||||
DeleteCStoreTableFiles(filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (nodeTag(parseTree) == T_TruncateStmt)
|
||||
|
@ -449,18 +408,9 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
|||
}
|
||||
else if (nodeTag(parseTree) == T_DropdbStmt)
|
||||
{
|
||||
DropdbStmt *dropDdStmt = (DropdbStmt *) parseTree;
|
||||
bool missingOk = true;
|
||||
Oid databaseOid = get_database_oid(dropDdStmt->dbname, missingOk);
|
||||
|
||||
/* let postgres handle error checking and dropping of the database */
|
||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
||||
destReceiver, completionTag);
|
||||
|
||||
if (databaseOid != InvalidOid)
|
||||
{
|
||||
RemoveCStoreDatabaseDirectory(databaseOid);
|
||||
}
|
||||
}
|
||||
|
||||
/* handle other utility statements */
|
||||
|
@ -642,11 +592,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
|
||||
/* init state to write to the cstore file */
|
||||
writeState = CStoreBeginWrite(relationId,
|
||||
cstoreOptions->filename,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupleDescriptor);
|
||||
writeState->relation = relation;
|
||||
|
||||
while (nextRowFound)
|
||||
{
|
||||
|
@ -912,12 +862,7 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
Assert(CStoreTable(relationId));
|
||||
|
||||
cstoreOptions = CStoreGetOptions(relationId);
|
||||
if (OidIsValid(relation->rd_rel->relfilenode))
|
||||
{
|
||||
RelationOpenSmgr(relation);
|
||||
RelationDropStorage(relation);
|
||||
}
|
||||
DeleteCStoreTableFiles(cstoreOptions->filename);
|
||||
InitializeRelFileNode(relation, true);
|
||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
||||
}
|
||||
}
|
||||
|
@ -927,12 +872,12 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
* tables. Version 12 and later do not, so we need to create one manually.
|
||||
*/
|
||||
static void
|
||||
InitializeRelFileNode(Relation relation)
|
||||
InitializeRelFileNode(Relation relation, bool force)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
Relation pg_class;
|
||||
HeapTuple tuple;
|
||||
Form_pg_class classform;
|
||||
Relation pg_class;
|
||||
HeapTuple tuple;
|
||||
Form_pg_class classform;
|
||||
|
||||
/*
|
||||
* Get a writable copy of the pg_class tuple for the given relation.
|
||||
|
@ -946,12 +891,12 @@ InitializeRelFileNode(Relation relation)
|
|||
RelationGetRelid(relation));
|
||||
classform = (Form_pg_class) GETSTRUCT(tuple);
|
||||
|
||||
if (!OidIsValid(classform->relfilenode))
|
||||
if (!OidIsValid(classform->relfilenode) || force)
|
||||
{
|
||||
char persistence = relation->rd_rel->relpersistence;
|
||||
Relation tmprel;
|
||||
Oid tablespace;
|
||||
Oid filenode = relation->rd_id;
|
||||
char persistence = relation->rd_rel->relpersistence;
|
||||
Oid filenode;
|
||||
RelFileNode newrnode;
|
||||
SMgrRelation srel;
|
||||
|
||||
|
@ -968,6 +913,8 @@ InitializeRelFileNode(Relation relation)
|
|||
else
|
||||
tablespace = MyDatabaseTableSpace;
|
||||
|
||||
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||
|
||||
newrnode.spcNode = tablespace;
|
||||
newrnode.dbNode = MyDatabaseId;
|
||||
newrnode.relNode = filenode;
|
||||
|
@ -1120,32 +1067,20 @@ Datum
|
|||
cstore_table_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
int64 tableSize = 0;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
char *dataFilename = NULL;
|
||||
int dataFileStatResult = 0;
|
||||
struct stat dataFileStatBuffer;
|
||||
|
||||
bool cstoreTable = CStoreTable(relationId);
|
||||
Relation relation;
|
||||
BlockNumber nblocks;
|
||||
|
||||
if (!cstoreTable)
|
||||
{
|
||||
ereport(ERROR, (errmsg("relation is not a cstore table")));
|
||||
}
|
||||
|
||||
cstoreOptions = CStoreGetOptions(relationId);
|
||||
dataFilename = cstoreOptions->filename;
|
||||
|
||||
dataFileStatResult = stat(dataFilename, &dataFileStatBuffer);
|
||||
if (dataFileStatResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not stat file \"%s\": %m", dataFilename)));
|
||||
}
|
||||
|
||||
tableSize += dataFileStatBuffer.st_size;
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
relation = cstore_fdw_open(relationId, AccessShareLock);
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
heap_close(relation, AccessShareLock);
|
||||
PG_RETURN_INT64(nblocks * BLCKSZ);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1197,7 +1132,6 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
|
|||
Oid optionContextId = PG_GETARG_OID(1);
|
||||
List *optionList = untransformRelOptions(optionArray);
|
||||
ListCell *optionCell = NULL;
|
||||
char *filename = NULL;
|
||||
char *compressionTypeString = NULL;
|
||||
char *stripeRowCountString = NULL;
|
||||
char *blockRowCountString = NULL;
|
||||
|
@ -1232,11 +1166,7 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
|
|||
optionNamesString->data)));
|
||||
}
|
||||
|
||||
if (strncmp(optionName, OPTION_NAME_FILENAME, NAMEDATALEN) == 0)
|
||||
{
|
||||
filename = defGetString(optionDef);
|
||||
}
|
||||
else if (strncmp(optionName, OPTION_NAME_COMPRESSION_TYPE, NAMEDATALEN) == 0)
|
||||
if (strncmp(optionName, OPTION_NAME_COMPRESSION_TYPE, NAMEDATALEN) == 0)
|
||||
{
|
||||
compressionTypeString = defGetString(optionDef);
|
||||
}
|
||||
|
@ -1252,7 +1182,7 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
|
|||
|
||||
if (optionContextId == ForeignTableRelationId)
|
||||
{
|
||||
ValidateForeignTableOptions(filename, compressionTypeString,
|
||||
ValidateForeignTableOptions(compressionTypeString,
|
||||
stripeRowCountString, blockRowCountString);
|
||||
}
|
||||
|
||||
|
@ -1271,11 +1201,6 @@ cstore_fdw_validator(PG_FUNCTION_ARGS)
|
|||
Datum
|
||||
cstore_clean_table_resources(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
StringInfo filePath = makeStringInfo();
|
||||
struct stat fileStat;
|
||||
int statResult = -1;
|
||||
|
||||
/*
|
||||
* TODO: Event triggers do not offer the relfilenode of the
|
||||
* dropped table, and by the time the sql_drop event trigger
|
||||
|
@ -1285,19 +1210,6 @@ cstore_clean_table_resources(PG_FUNCTION_ARGS)
|
|||
* leak storage.
|
||||
*/
|
||||
|
||||
appendStringInfo(filePath, "%s/%s/%d/%d", DataDir, CSTORE_FDW_NAME,
|
||||
(int) MyDatabaseId, (int) relationId);
|
||||
|
||||
/*
|
||||
* Check to see if the file exist first. This is the only way to
|
||||
* find out if the table being dropped is a cstore table.
|
||||
*/
|
||||
statResult = stat(filePath->data, &fileStat);
|
||||
if (statResult == 0)
|
||||
{
|
||||
DeleteCStoreTableFiles(filePath->data);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -1359,7 +1271,6 @@ static CStoreOptions *
|
|||
CStoreGetOptions(Oid foreignTableId)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
char *filename = NULL;
|
||||
CompressionType compressionType = DEFAULT_COMPRESSION_TYPE;
|
||||
int32 stripeRowCount = DEFAULT_STRIPE_ROW_COUNT;
|
||||
int32 blockRowCount = DEFAULT_BLOCK_ROW_COUNT;
|
||||
|
@ -1367,7 +1278,6 @@ CStoreGetOptions(Oid foreignTableId)
|
|||
char *stripeRowCountString = NULL;
|
||||
char *blockRowCountString = NULL;
|
||||
|
||||
filename = CStoreGetOptionValue(foreignTableId, OPTION_NAME_FILENAME);
|
||||
compressionTypeString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_COMPRESSION_TYPE);
|
||||
stripeRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
|
@ -1375,7 +1285,7 @@ CStoreGetOptions(Oid foreignTableId)
|
|||
blockRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_BLOCK_ROW_COUNT);
|
||||
|
||||
ValidateForeignTableOptions(filename, compressionTypeString,
|
||||
ValidateForeignTableOptions(compressionTypeString,
|
||||
stripeRowCountString, blockRowCountString);
|
||||
|
||||
/* parse provided options */
|
||||
|
@ -1392,14 +1302,7 @@ CStoreGetOptions(Oid foreignTableId)
|
|||
blockRowCount = pg_atoi(blockRowCountString, sizeof(int32), 0);
|
||||
}
|
||||
|
||||
/* set default filename if it is not provided */
|
||||
if (filename == NULL)
|
||||
{
|
||||
filename = CStoreDefaultFilePath(foreignTableId);
|
||||
}
|
||||
|
||||
cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
cstoreOptions->filename = filename;
|
||||
cstoreOptions->compressionType = compressionType;
|
||||
cstoreOptions->stripeRowCount = stripeRowCount;
|
||||
cstoreOptions->blockRowCount = blockRowCount;
|
||||
|
@ -1450,12 +1353,9 @@ CStoreGetOptionValue(Oid foreignTableId, const char *optionName)
|
|||
* considered invalid.
|
||||
*/
|
||||
static void
|
||||
ValidateForeignTableOptions(char *filename, char *compressionTypeString,
|
||||
ValidateForeignTableOptions(char *compressionTypeString,
|
||||
char *stripeRowCountString, char *blockRowCountString)
|
||||
{
|
||||
/* we currently do not have any checks for filename */
|
||||
(void) filename;
|
||||
|
||||
/* check if the provided compression type is valid */
|
||||
if (compressionTypeString != NULL)
|
||||
{
|
||||
|
@ -1500,36 +1400,6 @@ ValidateForeignTableOptions(char *filename, char *compressionTypeString,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreDefaultFilePath constructs the default file path to use for a cstore_fdw
|
||||
* table. The path is of the form $PGDATA/cstore_fdw/{databaseOid}/{relfilenode}.
|
||||
*/
|
||||
static char *
|
||||
CStoreDefaultFilePath(Oid foreignTableId)
|
||||
{
|
||||
StringInfo cstoreFilePath = NULL;
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
RelFileNode relationFileNode = relation->rd_node;
|
||||
Oid databaseOid = relationFileNode.dbNode;
|
||||
Oid relationFileOid = relationFileNode.relNode;
|
||||
|
||||
relation_close(relation, AccessShareLock);
|
||||
|
||||
/* PG12 onward does not create relfilenode for foreign tables */
|
||||
if (databaseOid == InvalidOid)
|
||||
{
|
||||
databaseOid = MyDatabaseId;
|
||||
relationFileOid = foreignTableId;
|
||||
}
|
||||
|
||||
cstoreFilePath = makeStringInfo();
|
||||
appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME,
|
||||
databaseOid, relationFileOid);
|
||||
|
||||
return cstoreFilePath->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreGetForeignRelSize obtains relation size estimates for a foreign table and
|
||||
* puts its estimate for row count into baserel->rows.
|
||||
|
@ -1537,14 +1407,14 @@ CStoreDefaultFilePath(Oid foreignTableId)
|
|||
static void
|
||||
CStoreGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel,
|
||||
cstoreOptions->filename);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
double tupleCountEstimate = TupleCountEstimate(relation, baserel);
|
||||
double rowSelectivity = clauselist_selectivity(root, baserel->baserestrictinfo,
|
||||
0, JOIN_INNER, NULL);
|
||||
|
||||
double outputRowCount = clamp_row_est(tupleCountEstimate * rowSelectivity);
|
||||
baserel->rows = outputRowCount;
|
||||
heap_close(relation, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1558,7 +1428,6 @@ static void
|
|||
CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId)
|
||||
{
|
||||
Path *foreignScanPath = NULL;
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
|
||||
/*
|
||||
|
@ -1579,15 +1448,14 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
|
|||
*/
|
||||
List *queryColumnList = ColumnList(baserel, foreignTableId);
|
||||
uint32 queryColumnCount = list_length(queryColumnList);
|
||||
BlockNumber relationPageCount = PageCount(cstoreOptions->filename);
|
||||
BlockNumber relationPageCount = PageCount(relation);
|
||||
uint32 relationColumnCount = RelationGetNumberOfAttributes(relation);
|
||||
|
||||
double queryColumnRatio = (double) queryColumnCount / relationColumnCount;
|
||||
double queryPageCount = relationPageCount * queryColumnRatio;
|
||||
double totalDiskAccessCost = seq_page_cost * queryPageCount;
|
||||
|
||||
double tupleCountEstimate = TupleCountEstimate(foreignTableId, baserel,
|
||||
cstoreOptions->filename);
|
||||
double tupleCountEstimate = TupleCountEstimate(relation, baserel);
|
||||
|
||||
/*
|
||||
* We estimate costs almost the same way as cost_seqscan(), thus assuming
|
||||
|
@ -1692,7 +1560,7 @@ CStoreGetForeignPlan(PlannerInfo * root, RelOptInfo * baserel, Oid foreignTableI
|
|||
* file.
|
||||
*/
|
||||
static double
|
||||
TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
|
||||
TupleCountEstimate(Relation relation, RelOptInfo *baserel)
|
||||
{
|
||||
double tupleCountEstimate = 0.0;
|
||||
|
||||
|
@ -1705,13 +1573,13 @@ TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
|
|||
* that by the current file size.
|
||||
*/
|
||||
double tupleDensity = baserel->tuples / (double) baserel->pages;
|
||||
BlockNumber pageCount = PageCount(filename);
|
||||
BlockNumber pageCount = PageCount(relation);
|
||||
|
||||
tupleCountEstimate = clamp_row_est(tupleDensity * (double) pageCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
tupleCountEstimate = (double) CStoreTableRowCount(relid, filename);
|
||||
tupleCountEstimate = (double) CStoreTableRowCount(relation);
|
||||
}
|
||||
|
||||
return tupleCountEstimate;
|
||||
|
@ -1720,25 +1588,14 @@ TupleCountEstimate(Oid relid, RelOptInfo *baserel, const char *filename)
|
|||
|
||||
/* PageCount calculates and returns the number of pages in a file. */
|
||||
static BlockNumber
|
||||
PageCount(const char *filename)
|
||||
PageCount(Relation relation)
|
||||
{
|
||||
BlockNumber pageCount = 0;
|
||||
struct stat statBuffer;
|
||||
BlockNumber nblocks;
|
||||
|
||||
/* if file doesn't exist at plan time, use default estimate for its size */
|
||||
int statResult = stat(filename, &statBuffer);
|
||||
if (statResult < 0)
|
||||
{
|
||||
statBuffer.st_size = 10 * BLCKSZ;
|
||||
}
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
pageCount = (statBuffer.st_size + (BLCKSZ - 1)) / BLCKSZ;
|
||||
if (pageCount < 1)
|
||||
{
|
||||
pageCount = 1;
|
||||
}
|
||||
|
||||
return pageCount;
|
||||
return (nblocks > 0) ? nblocks : 1;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1856,25 +1713,18 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
|
|||
static void
|
||||
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
|
||||
{
|
||||
Relation relation = scanState->ss.ss_currentRelation;
|
||||
CStoreOptions *cstoreOptions;
|
||||
Oid foreignTableId;
|
||||
Relation relation = scanState->ss.ss_currentRelation;
|
||||
|
||||
cstore_fdw_initrel(relation);
|
||||
foreignTableId = RelationGetRelid(relation);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
|
||||
/* supress file size if we're not showing cost details */
|
||||
if (explainState->costs)
|
||||
{
|
||||
struct stat statBuffer;
|
||||
|
||||
int statResult = stat(cstoreOptions->filename, &statBuffer);
|
||||
if (statResult == 0)
|
||||
{
|
||||
ExplainPropertyLong("CStore File Size", (long) statBuffer.st_size,
|
||||
explainState);
|
||||
}
|
||||
long nblocks;
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
ExplainPropertyLong("CStore File Size", (long) (nblocks * BLCKSZ),
|
||||
explainState);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1909,8 +1759,9 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
|||
whereClauseList = foreignScan->scan.plan.qual;
|
||||
|
||||
columnList = (List *) linitial(foreignPrivateList);
|
||||
readState = CStoreBeginRead(foreignTableId, cstoreOptions->filename,
|
||||
readState = CStoreBeginRead(foreignTableId,
|
||||
tupleDescriptor, columnList, whereClauseList);
|
||||
readState->relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
|
||||
scanState->fdw_state = (void *) readState;
|
||||
}
|
||||
|
@ -1956,6 +1807,7 @@ CStoreEndForeignScan(ForeignScanState *scanState)
|
|||
TableReadState *readState = (TableReadState *) scanState->fdw_state;
|
||||
if (readState != NULL)
|
||||
{
|
||||
heap_close(readState->relation, AccessShareLock);
|
||||
CStoreEndRead(readState);
|
||||
}
|
||||
}
|
||||
|
@ -1979,22 +1831,9 @@ CStoreAnalyzeForeignTable(Relation relation,
|
|||
AcquireSampleRowsFunc *acquireSampleRowsFunc,
|
||||
BlockNumber *totalPageCount)
|
||||
{
|
||||
Oid foreignTableId = RelationGetRelid(relation);
|
||||
CStoreOptions *cstoreOptions;
|
||||
struct stat statBuffer;
|
||||
|
||||
cstore_fdw_initrel(relation);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
|
||||
int statResult = stat(cstoreOptions->filename, &statBuffer);
|
||||
if (statResult < 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not stat file \"%s\": %m",
|
||||
cstoreOptions->filename)));
|
||||
}
|
||||
|
||||
(*totalPageCount) = PageCount(cstoreOptions->filename);
|
||||
RelationOpenSmgr(relation);
|
||||
(*totalPageCount) = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
(*acquireSampleRowsFunc) = CStoreAcquireSampleRows;
|
||||
|
||||
return true;
|
||||
|
@ -2267,7 +2106,6 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
|
|||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
writeState = CStoreBeginWrite(foreignTableOid,
|
||||
cstoreOptions->filename,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
|
@ -2372,7 +2210,7 @@ cstore_fdw_initrel(Relation rel)
|
|||
{
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
if (rel->rd_rel->relfilenode == InvalidOid)
|
||||
InitializeRelFileNode(rel);
|
||||
InitializeRelFileNode(rel, false);
|
||||
|
||||
/*
|
||||
* Copied code from RelationInitPhysicalAddr(), which doesn't
|
||||
|
|
115
cstore_reader.c
115
cstore_reader.c
|
@ -38,7 +38,7 @@
|
|||
#include "cstore_version_compat.h"
|
||||
|
||||
/* static function declarations */
|
||||
static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile,
|
||||
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||
StripeMetadata *stripeMetadata,
|
||||
StripeFooter *stripeFooter,
|
||||
TupleDesc tupleDescriptor,
|
||||
|
@ -48,12 +48,12 @@ static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColum
|
|||
uint64 blockIndex, uint64 blockRowIndex,
|
||||
ColumnBlockData **blockDataArray,
|
||||
Datum *columnValues, bool *columnNulls);
|
||||
static ColumnBuffers * LoadColumnBuffers(FILE *tableFile,
|
||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||
ColumnBlockSkipNode *blockSkipNodeArray,
|
||||
uint32 blockCount, uint64 existsFileOffset,
|
||||
uint64 valueFileOffset,
|
||||
Form_pg_attribute attributeForm);
|
||||
static StripeSkipList * LoadStripeSkipList(FILE *tableFile,
|
||||
static StripeSkipList * LoadStripeSkipList(Relation relation,
|
||||
StripeMetadata *stripeMetadata,
|
||||
StripeFooter *stripeFooter,
|
||||
uint32 columnCount,
|
||||
|
@ -82,10 +82,10 @@ static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex
|
|||
TupleDesc tupleDescriptor);
|
||||
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
||||
Form_pg_attribute attributeForm);
|
||||
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size);
|
||||
static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
||||
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
|
||||
uint32 columnCount);
|
||||
static uint64 StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata);
|
||||
static uint64 StripeRowCount(Relation relation, StripeMetadata *stripeMetadata);
|
||||
static int RelationColumnCount(Oid relid);
|
||||
|
||||
|
||||
|
@ -94,12 +94,11 @@ static int RelationColumnCount(Oid relid);
|
|||
* read handle that's used during reading rows and finishing the read operation.
|
||||
*/
|
||||
TableReadState *
|
||||
CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
||||
CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
TableReadState *readState = NULL;
|
||||
TableMetadata *tableMetadata = NULL;
|
||||
FILE *tableFile = NULL;
|
||||
MemoryContext stripeReadContext = NULL;
|
||||
uint32 columnCount = 0;
|
||||
bool *projectedColumnMask = NULL;
|
||||
|
@ -107,14 +106,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
|||
|
||||
tableMetadata = ReadTableMetadata(relationId);
|
||||
|
||||
tableFile = AllocateFile(filename, PG_BINARY_R);
|
||||
if (tableFile == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for reading: %m",
|
||||
filename)));
|
||||
}
|
||||
|
||||
/*
|
||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||
* this memory context before loading a new stripe. This is to avoid memory
|
||||
|
@ -131,7 +122,6 @@ CStoreBeginRead(Oid relationId, const char *filename, TupleDesc tupleDescriptor,
|
|||
|
||||
readState = palloc0(sizeof(TableReadState));
|
||||
readState->relationId = relationId;
|
||||
readState->tableFile = tableFile;
|
||||
readState->tableMetadata = tableMetadata;
|
||||
readState->projectedColumnList = projectedColumnList;
|
||||
readState->whereClauseList = whereClauseList;
|
||||
|
@ -187,7 +177,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
|||
stripeFooter = ReadStripeFooter(readState->relationId,
|
||||
stripeMetadata->id,
|
||||
readState->tupleDescriptor->natts);
|
||||
stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata,
|
||||
stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
|
||||
stripeMetadata,
|
||||
stripeFooter,
|
||||
readState->tupleDescriptor,
|
||||
readState->projectedColumnList,
|
||||
|
@ -263,7 +254,6 @@ CStoreEndRead(TableReadState *readState)
|
|||
int columnCount = readState->tupleDescriptor->natts;
|
||||
|
||||
MemoryContextDelete(readState->stripeReadContext);
|
||||
FreeFile(readState->tableFile);
|
||||
list_free_deep(readState->tableMetadata->stripeMetadataList);
|
||||
FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
|
||||
pfree(readState->tableMetadata);
|
||||
|
@ -326,30 +316,20 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount)
|
|||
|
||||
/* CStoreTableRowCount returns the exact row count of a table using skiplists */
|
||||
uint64
|
||||
CStoreTableRowCount(Oid relid, const char *filename)
|
||||
CStoreTableRowCount(Relation relation)
|
||||
{
|
||||
TableMetadata *tableMetadata = NULL;
|
||||
FILE *tableFile;
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
uint64 totalRowCount = 0;
|
||||
|
||||
tableMetadata = ReadTableMetadata(relid);
|
||||
|
||||
tableFile = AllocateFile(filename, PG_BINARY_R);
|
||||
if (tableFile == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for reading: %m", filename)));
|
||||
}
|
||||
tableMetadata = ReadTableMetadata(relation->rd_id);
|
||||
|
||||
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
||||
{
|
||||
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
||||
totalRowCount += StripeRowCount(relid, tableFile, stripeMetadata);
|
||||
totalRowCount += StripeRowCount(relation, stripeMetadata);
|
||||
}
|
||||
|
||||
FreeFile(tableFile);
|
||||
|
||||
return totalRowCount;
|
||||
}
|
||||
|
||||
|
@ -359,15 +339,15 @@ CStoreTableRowCount(Oid relid, const char *filename)
|
|||
* skip list, and returns number of rows for given stripe.
|
||||
*/
|
||||
static uint64
|
||||
StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata)
|
||||
StripeRowCount(Relation relation, StripeMetadata *stripeMetadata)
|
||||
{
|
||||
uint64 rowCount = 0;
|
||||
StringInfo firstColumnSkipListBuffer = NULL;
|
||||
|
||||
StripeFooter *stripeFooter = ReadStripeFooter(relid, stripeMetadata->id,
|
||||
RelationColumnCount(relid));
|
||||
StripeFooter *stripeFooter = ReadStripeFooter(relation->rd_id, stripeMetadata->id,
|
||||
RelationColumnCount(relation->rd_id));
|
||||
|
||||
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset,
|
||||
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
|
||||
stripeFooter->skipListSizeArray[0]);
|
||||
rowCount = DeserializeRowCount(firstColumnSkipListBuffer);
|
||||
|
||||
|
@ -381,7 +361,7 @@ StripeRowCount(Oid relid, FILE *tableFile, StripeMetadata *stripeMetadata)
|
|||
* and only loads columns that are projected in the query.
|
||||
*/
|
||||
static StripeBuffers *
|
||||
LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
|
||||
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||
StripeFooter *stripeFooter, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
|
@ -393,7 +373,7 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
|
|||
|
||||
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||
|
||||
StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata,
|
||||
StripeSkipList *stripeSkipList = LoadStripeSkipList(relation, stripeMetadata,
|
||||
stripeFooter, columnCount,
|
||||
projectedColumnMask,
|
||||
tupleDescriptor);
|
||||
|
@ -423,7 +403,7 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
|
|||
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
|
||||
uint32 blockCount = selectedBlockSkipList->blockCount;
|
||||
|
||||
ColumnBuffers *columnBuffers = LoadColumnBuffers(tableFile, blockSkipNode,
|
||||
ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, blockSkipNode,
|
||||
blockCount,
|
||||
existsFileOffset,
|
||||
valueFileOffset,
|
||||
|
@ -482,7 +462,7 @@ ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
|
|||
* and lengths are retrieved from the column block skip node array.
|
||||
*/
|
||||
static ColumnBuffers *
|
||||
LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||
LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
||||
uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset,
|
||||
Form_pg_attribute attributeForm)
|
||||
{
|
||||
|
@ -505,7 +485,7 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
|
|||
{
|
||||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||
uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset;
|
||||
StringInfo rawExistsBuffer = ReadFromFile(tableFile, existsOffset,
|
||||
StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
|
||||
blockSkipNode->existsLength);
|
||||
|
||||
blockBuffersArray[blockIndex]->existsBuffer = rawExistsBuffer;
|
||||
|
@ -517,7 +497,7 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
|
|||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||
CompressionType compressionType = blockSkipNode->valueCompressionType;
|
||||
uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset;
|
||||
StringInfo rawValueBuffer = ReadFromFile(tableFile, valueOffset,
|
||||
StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
|
||||
blockSkipNode->valueLength);
|
||||
|
||||
blockBuffersArray[blockIndex]->valueBuffer = rawValueBuffer;
|
||||
|
@ -533,7 +513,8 @@ LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
|
|||
|
||||
/* Reads the skip list for the given stripe. */
|
||||
static StripeSkipList *
|
||||
LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
|
||||
LoadStripeSkipList(Relation relation,
|
||||
StripeMetadata *stripeMetadata,
|
||||
StripeFooter *stripeFooter, uint32 columnCount,
|
||||
bool *projectedColumnMask,
|
||||
TupleDesc tupleDescriptor)
|
||||
|
@ -547,7 +528,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
|
|||
uint32 stripeColumnCount = stripeFooter->columnCount;
|
||||
|
||||
/* deserialize block count */
|
||||
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset,
|
||||
firstColumnSkipListBuffer = ReadFromSmgr(relation, stripeMetadata->fileOffset,
|
||||
stripeFooter->skipListSizeArray[0]);
|
||||
stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer);
|
||||
|
||||
|
@ -570,7 +551,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
|
|||
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
|
||||
|
||||
StringInfo columnSkipListBuffer =
|
||||
ReadFromFile(tableFile, currentColumnSkipListFileOffset,
|
||||
ReadFromSmgr(relation, currentColumnSkipListFileOffset,
|
||||
columnSkipListSize);
|
||||
ColumnBlockSkipNode *columnSkipList =
|
||||
DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval,
|
||||
|
@ -1178,49 +1159,37 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor
|
|||
return defaultValue;
|
||||
}
|
||||
|
||||
|
||||
/* Reads the given segment from the given file. */
|
||||
static StringInfo
|
||||
ReadFromFile(FILE *file, uint64 offset, uint32 size)
|
||||
ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
|
||||
{
|
||||
int fseekResult = 0;
|
||||
int freadResult = 0;
|
||||
int fileError = 0;
|
||||
StringInfo resultBuffer = makeStringInfo();
|
||||
uint64 read = 0;
|
||||
|
||||
StringInfo resultBuffer = makeStringInfo();
|
||||
enlargeStringInfo(resultBuffer, size);
|
||||
resultBuffer->len = size;
|
||||
|
||||
if (size == 0)
|
||||
while (read < size)
|
||||
{
|
||||
return resultBuffer;
|
||||
}
|
||||
Buffer buffer;
|
||||
Page page;
|
||||
PageHeader phdr;
|
||||
uint32 to_read;
|
||||
SmgrAddr addr = logical_to_smgr(offset + read);
|
||||
uint32 pageoffset = addr.offset + SizeOfPageHeaderData;
|
||||
|
||||
errno = 0;
|
||||
fseekResult = fseeko(file, offset, SEEK_SET);
|
||||
if (fseekResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not seek in file: %m")));
|
||||
}
|
||||
buffer = ReadBuffer(rel, addr.blockno);
|
||||
page = BufferGetPage(buffer);
|
||||
phdr = (PageHeader)page;
|
||||
|
||||
freadResult = fread(resultBuffer->data, size, 1, file);
|
||||
if (freadResult != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not read enough data from file")));
|
||||
}
|
||||
|
||||
fileError = ferror(file);
|
||||
if (fileError != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not read file: %m")));
|
||||
to_read = Min(size - read, phdr->pd_upper - pageoffset);
|
||||
memcpy(resultBuffer->data + read, page + pageoffset, to_read);
|
||||
ReleaseBuffer(buffer);
|
||||
read += to_read;
|
||||
}
|
||||
|
||||
return resultBuffer;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetUncompressedBlockData iterates over deserialized column block data
|
||||
* and sets valueBuffer field to empty buffer. This field is allocated in stripe
|
||||
|
|
166
cstore_writer.c
166
cstore_writer.c
|
@ -16,12 +16,12 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "access/nbtree.h"
|
||||
#include "catalog/pg_am.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
#include "cstore.h"
|
||||
#include "cstore_metadata_serialization.h"
|
||||
|
@ -51,8 +51,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
|
|||
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
||||
static void AppendStripeMetadata(TableMetadata *tableMetadata,
|
||||
StripeMetadata stripeMetadata);
|
||||
static void WriteToFile(FILE *file, void *data, uint32 dataLength);
|
||||
static void SyncAndCloseFile(FILE *file);
|
||||
static StringInfo CopyStringInfo(StringInfo sourceString);
|
||||
|
||||
|
||||
|
@ -65,12 +63,11 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
|
|||
*/
|
||||
TableWriteState *
|
||||
CStoreBeginWrite(Oid relationId,
|
||||
const char *filename, CompressionType compressionType,
|
||||
CompressionType compressionType,
|
||||
uint64 stripeMaxRowCount, uint32 blockRowCount,
|
||||
TupleDesc tupleDescriptor)
|
||||
{
|
||||
TableWriteState *writeState = NULL;
|
||||
FILE *tableFile = NULL;
|
||||
TableMetadata *tableMetadata = NULL;
|
||||
FmgrInfo **comparisonFunctionArray = NULL;
|
||||
MemoryContext stripeWriteContext = NULL;
|
||||
|
@ -81,14 +78,6 @@ CStoreBeginWrite(Oid relationId,
|
|||
ColumnBlockData **blockData = NULL;
|
||||
uint64 currentStripeId = 0;
|
||||
|
||||
tableFile = AllocateFile(filename, "a+");
|
||||
if (tableFile == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for writing: %m",
|
||||
filename)));
|
||||
}
|
||||
|
||||
tableMetadata = ReadTableMetadata(relationId);
|
||||
|
||||
/*
|
||||
|
@ -99,7 +88,6 @@ CStoreBeginWrite(Oid relationId,
|
|||
{
|
||||
StripeMetadata *lastStripe = NULL;
|
||||
uint64 lastStripeSize = 0;
|
||||
int fseekResult = 0;
|
||||
|
||||
lastStripe = llast(tableMetadata->stripeMetadataList);
|
||||
lastStripeSize += lastStripe->skipListLength;
|
||||
|
@ -108,14 +96,6 @@ CStoreBeginWrite(Oid relationId,
|
|||
|
||||
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
||||
currentStripeId = lastStripe->id + 1;
|
||||
|
||||
errno = 0;
|
||||
fseekResult = fseeko(tableFile, currentFileOffset, SEEK_SET);
|
||||
if (fseekResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not seek in file \"%s\": %m", filename)));
|
||||
}
|
||||
}
|
||||
|
||||
/* get comparison function pointers for each of the columns */
|
||||
|
@ -154,7 +134,6 @@ CStoreBeginWrite(Oid relationId,
|
|||
|
||||
writeState = palloc0(sizeof(TableWriteState));
|
||||
writeState->relationId = relationId;
|
||||
writeState->tableFile = tableFile;
|
||||
writeState->tableMetadata = tableMetadata;
|
||||
writeState->compressionType = compressionType;
|
||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||
|
@ -312,8 +291,6 @@ CStoreEndWrite(TableWriteState *writeState)
|
|||
AppendStripeMetadata(writeState->tableMetadata, stripeMetadata);
|
||||
}
|
||||
|
||||
SyncAndCloseFile(writeState->tableFile);
|
||||
|
||||
MemoryContextDelete(writeState->stripeWriteContext);
|
||||
list_free_deep(writeState->tableMetadata->stripeMetadataList);
|
||||
pfree(writeState->comparisonFunctionArray);
|
||||
|
@ -391,6 +368,56 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
|||
return stripeSkipList;
|
||||
}
|
||||
|
||||
static void
|
||||
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||
{
|
||||
uint64 logicalOffset = writeState->currentFileOffset;
|
||||
uint64 remaining = dataLength;
|
||||
Relation rel = writeState->relation;
|
||||
Buffer buffer;
|
||||
|
||||
while (remaining > 0)
|
||||
{
|
||||
SmgrAddr addr = logical_to_smgr(logicalOffset);
|
||||
BlockNumber nblocks;
|
||||
Page page;
|
||||
PageHeader phdr;
|
||||
uint64 to_write;
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
while (addr.blockno >= nblocks)
|
||||
{
|
||||
Buffer buffer = ReadBuffer(rel, P_NEW);
|
||||
ReleaseBuffer(buffer);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
}
|
||||
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
buffer = ReadBuffer(rel, addr.blockno);
|
||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
page = BufferGetPage(buffer);
|
||||
phdr = (PageHeader) page;
|
||||
if (PageIsNew(page))
|
||||
PageInit(page, BLCKSZ, 0);
|
||||
|
||||
/* always appending */
|
||||
Assert(phdr->pd_lower == addr.offset + SizeOfPageHeaderData);
|
||||
|
||||
to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining);
|
||||
memcpy(page + phdr->pd_lower, data, to_write);
|
||||
phdr->pd_lower += to_write;
|
||||
|
||||
UnlockReleaseBuffer(buffer);
|
||||
|
||||
data += to_write;
|
||||
remaining -= to_write;
|
||||
logicalOffset += to_write;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* FlushStripe flushes current stripe data into the file. The function first ensures
|
||||
|
@ -409,7 +436,6 @@ FlushStripe(TableWriteState *writeState)
|
|||
uint32 columnIndex = 0;
|
||||
uint32 blockIndex = 0;
|
||||
TableMetadata *tableMetadata = writeState->tableMetadata;
|
||||
FILE *tableFile = writeState->tableFile;
|
||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||
ColumnBlockSkipNode **columnSkipNodeArray = stripeSkipList->blockSkipNodeArray;
|
||||
|
@ -419,6 +445,7 @@ FlushStripe(TableWriteState *writeState)
|
|||
uint32 blockRowCount = tableMetadata->blockRowCount;
|
||||
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
|
||||
uint64 initialFileOffset = writeState->currentFileOffset;
|
||||
|
||||
/*
|
||||
* check if the last block needs serialization , the last block was not serialized
|
||||
|
@ -479,7 +506,8 @@ FlushStripe(TableWriteState *writeState)
|
|||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
StringInfo skipListBuffer = skipListBufferArray[columnIndex];
|
||||
WriteToFile(tableFile, skipListBuffer->data, skipListBuffer->len);
|
||||
WriteToSmgr(writeState, skipListBuffer->data, skipListBuffer->len);
|
||||
writeState->currentFileOffset += skipListBuffer->len;
|
||||
}
|
||||
|
||||
/* then, we flush the data buffers */
|
||||
|
@ -494,7 +522,8 @@ FlushStripe(TableWriteState *writeState)
|
|||
columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo existsBuffer = blockBuffers->existsBuffer;
|
||||
|
||||
WriteToFile(tableFile, existsBuffer->data, existsBuffer->len);
|
||||
WriteToSmgr(writeState, existsBuffer->data, existsBuffer->len);
|
||||
writeState->currentFileOffset += existsBuffer->len;
|
||||
}
|
||||
|
||||
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
|
||||
|
@ -503,7 +532,8 @@ FlushStripe(TableWriteState *writeState)
|
|||
columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo valueBuffer = blockBuffers->valueBuffer;
|
||||
|
||||
WriteToFile(tableFile, valueBuffer->data, valueBuffer->len);
|
||||
WriteToSmgr(writeState, valueBuffer->data, valueBuffer->len);
|
||||
writeState->currentFileOffset += valueBuffer->len;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -520,16 +550,12 @@ FlushStripe(TableWriteState *writeState)
|
|||
dataLength += stripeFooter->valueSizeArray[columnIndex];
|
||||
}
|
||||
|
||||
stripeMetadata.fileOffset = writeState->currentFileOffset;
|
||||
stripeMetadata.fileOffset = initialFileOffset;
|
||||
stripeMetadata.skipListLength = skipListLength;
|
||||
stripeMetadata.dataLength = dataLength;
|
||||
stripeMetadata.footerLength = 0;
|
||||
stripeMetadata.id = writeState->currentStripeId;
|
||||
|
||||
/* advance current file offset */
|
||||
writeState->currentFileOffset += skipListLength;
|
||||
writeState->currentFileOffset += dataLength;
|
||||
|
||||
return stripeMetadata;
|
||||
}
|
||||
|
||||
|
@ -834,76 +860,6 @@ AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata
|
|||
stripeMetadataCopy);
|
||||
}
|
||||
|
||||
|
||||
/* Writes the given data to the given file pointer and checks for errors. */
|
||||
static void
|
||||
WriteToFile(FILE *file, void *data, uint32 dataLength)
|
||||
{
|
||||
int writeResult = 0;
|
||||
int errorResult = 0;
|
||||
|
||||
if (dataLength == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
writeResult = fwrite(data, dataLength, 1, file);
|
||||
if (writeResult != 1)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not write file: %m")));
|
||||
}
|
||||
|
||||
errorResult = ferror(file);
|
||||
if (errorResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("error in file: %m")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Flushes, syncs, and closes the given file pointer and checks for errors. */
|
||||
static void
|
||||
SyncAndCloseFile(FILE *file)
|
||||
{
|
||||
int flushResult = 0;
|
||||
int syncResult = 0;
|
||||
int errorResult = 0;
|
||||
int freeResult = 0;
|
||||
|
||||
errno = 0;
|
||||
flushResult = fflush(file);
|
||||
if (flushResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not flush file: %m")));
|
||||
}
|
||||
|
||||
syncResult = pg_fsync(fileno(file));
|
||||
if (syncResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not sync file: %m")));
|
||||
}
|
||||
|
||||
errorResult = ferror(file);
|
||||
if (errorResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("error in file: %m")));
|
||||
}
|
||||
|
||||
freeResult = FreeFile(file);
|
||||
if (freeResult != 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not close file: %m")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyStringInfo creates a deep copy of given source string allocating only needed
|
||||
* amount of memory.
|
||||
|
|
|
@ -12,17 +12,6 @@
|
|||
-- 'postgres' directory is excluded from comparison to have the same result.
|
||||
-- store postgres database oid
|
||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||
-- Check that files for the automatically managed table exist in the
|
||||
-- cstore_fdw/{databaseoid} directory.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- DROP cstore_fdw tables
|
||||
DROP FOREIGN TABLE contestant;
|
||||
DROP FOREIGN TABLE contestant_compressed;
|
||||
|
@ -31,17 +20,6 @@ CREATE SCHEMA test_schema;
|
|||
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
||||
DROP SCHEMA test_schema CASCADE;
|
||||
NOTICE: drop cascades to foreign table test_schema.test_table
|
||||
-- Check that the files have been deleted and the directory is empty after the
|
||||
-- DROP table command.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT current_database() datname \gset
|
||||
CREATE DATABASE db_to_drop;
|
||||
\c db_to_drop
|
||||
|
@ -49,49 +27,14 @@ CREATE EXTENSION cstore_fdw;
|
|||
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
|
||||
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
|
||||
-- should see 2 files, data and footer file for single table
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw/' || :databaseoid);
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- should see 2 directories 1 for each database, excluding postgres database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
DROP EXTENSION cstore_fdw CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to server cstore_server
|
||||
drop cascades to foreign table test_table
|
||||
-- should only see 1 directory here
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- test database drop
|
||||
CREATE EXTENSION cstore_fdw;
|
||||
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
|
||||
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
|
||||
-- should see 2 directories 1 for each database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
\c :datname
|
||||
DROP DATABASE db_to_drop;
|
||||
-- should only see 1 directory for the default database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -9,17 +9,6 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- Check that files for the automatically managed table exist in the
|
||||
-- cstore_fdw/{databaseoid} directory.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- CREATE a cstore_fdw table, fill with some data --
|
||||
CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server;
|
||||
CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server;
|
||||
|
@ -75,16 +64,6 @@ SELECT cstore_table_size('cstore_truncate_test_compressed');
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- make sure data files still present
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
count
|
||||
-------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
|
||||
INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a;
|
||||
INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a;
|
||||
|
@ -250,13 +229,3 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
|
|||
DROP SCHEMA truncate_schema CASCADE;
|
||||
NOTICE: drop cascades to foreign table truncate_schema.truncate_tbl
|
||||
DROP USER truncate_user;
|
||||
-- verify files are removed
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -30,8 +30,7 @@ $$ LANGUAGE PLPGSQL;
|
|||
-- Create and load data
|
||||
CREATE FOREIGN TABLE test_block_filtering (a int)
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/block_filtering.cstore',
|
||||
block_row_count '1000', stripe_row_count '2000');
|
||||
OPTIONS(block_row_count '1000', stripe_row_count '2000');
|
||||
|
||||
COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV;
|
||||
|
||||
|
@ -60,8 +59,7 @@ SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering WHERE a BET
|
|||
|
||||
-- Verify that we are fine with collations which use a different alphabet order
|
||||
CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK")
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/collation_block_filtering.cstore');
|
||||
SERVER cstore_server;
|
||||
COPY collation_block_filtering_test FROM STDIN;
|
||||
A
|
||||
Å
|
||||
|
|
|
@ -3,8 +3,7 @@
|
|||
--
|
||||
CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT,
|
||||
percentile FLOAT, country CHAR(3), achievements TEXT[])
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/test_contestant.cstore');
|
||||
SERVER cstore_server;
|
||||
|
||||
-- load table data from file
|
||||
COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
|
||||
|
|
|
@ -12,30 +12,24 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
|||
-- Validator tests
|
||||
CREATE FOREIGN TABLE test_validator_invalid_option ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', bad_option_name '1'); -- ERROR
|
||||
OPTIONS(bad_option_name '1'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', stripe_row_count '0'); -- ERROR
|
||||
OPTIONS(stripe_row_count '0'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', block_row_count '0'); -- ERROR
|
||||
OPTIONS(block_row_count '0'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', compression 'invalid_compression'); -- ERROR
|
||||
|
||||
-- Invalid file path test
|
||||
CREATE FOREIGN TABLE test_invalid_file_path ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'bad_directory_path/bad_file_path'); --ERROR
|
||||
OPTIONS(compression 'invalid_compression'); -- ERROR
|
||||
|
||||
-- Create uncompressed table
|
||||
CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT,
|
||||
percentile FLOAT, country CHAR(3), achievements TEXT[])
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/contestant.cstore');
|
||||
SERVER cstore_server;
|
||||
|
||||
|
||||
-- Create compressed table with automatically determined file path
|
||||
|
|
|
@ -11,8 +11,7 @@ SET intervalstyle TO 'POSTGRES_VERBOSE';
|
|||
|
||||
-- Test array types
|
||||
CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[],
|
||||
text_array text[]) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/array_types.cstore');
|
||||
text_array text[]) SERVER cstore_server;
|
||||
|
||||
COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV;
|
||||
|
||||
|
@ -22,8 +21,7 @@ SELECT * FROM test_array_types;
|
|||
-- Test date/time types
|
||||
CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp,
|
||||
timestamp_with_timezone timestamp with time zone, date date, time time,
|
||||
interval interval) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/datetime_types.cstore');
|
||||
interval interval) SERVER cstore_server;
|
||||
|
||||
COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV;
|
||||
|
||||
|
@ -35,8 +33,7 @@ CREATE TYPE enum_type AS ENUM ('a', 'b', 'c');
|
|||
CREATE TYPE composite_type AS (a int, b text);
|
||||
|
||||
CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type,
|
||||
composite composite_type) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/enum_and_composite_types.cstore');
|
||||
composite composite_type) SERVER cstore_server;
|
||||
|
||||
COPY test_enum_and_composite_types FROM
|
||||
'@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV;
|
||||
|
@ -46,8 +43,7 @@ SELECT * FROM test_enum_and_composite_types;
|
|||
|
||||
-- Test range types
|
||||
CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range,
|
||||
numrange numrange, tsrange tsrange) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/range_types.cstore');
|
||||
numrange numrange, tsrange tsrange) SERVER cstore_server;
|
||||
|
||||
COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV;
|
||||
|
||||
|
@ -56,8 +52,7 @@ SELECT * FROM test_range_types;
|
|||
|
||||
-- Test other types
|
||||
CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money,
|
||||
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/other_types.cstore');
|
||||
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server;
|
||||
|
||||
COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV;
|
||||
|
||||
|
@ -66,8 +61,7 @@ SELECT * FROM test_other_types;
|
|||
|
||||
-- Test null values
|
||||
CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type)
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/null_values.cstore');
|
||||
SERVER cstore_server;
|
||||
|
||||
COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV;
|
||||
|
||||
|
|
|
@ -26,8 +26,7 @@ $$ LANGUAGE PLPGSQL;
|
|||
-- Create and load data
|
||||
CREATE FOREIGN TABLE test_block_filtering (a int)
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/block_filtering.cstore',
|
||||
block_row_count '1000', stripe_row_count '2000');
|
||||
OPTIONS(block_row_count '1000', stripe_row_count '2000');
|
||||
COPY test_block_filtering FROM '@abs_srcdir@/data/block_filtering.csv' WITH CSV;
|
||||
-- Verify that filtered_row_count is less than 1000 for the following queries
|
||||
SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering');
|
||||
|
@ -107,8 +106,7 @@ SELECT filtered_row_count('SELECT count(*) FROM test_block_filtering WHERE a BET
|
|||
|
||||
-- Verify that we are fine with collations which use a different alphabet order
|
||||
CREATE FOREIGN TABLE collation_block_filtering_test(A text collate "da_DK")
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/collation_block_filtering.cstore');
|
||||
SERVER cstore_server;
|
||||
COPY collation_block_filtering_test FROM STDIN;
|
||||
SELECT * FROM collation_block_filtering_test WHERE A > 'B';
|
||||
a
|
||||
|
|
|
@ -3,8 +3,7 @@
|
|||
--
|
||||
CREATE FOREIGN TABLE test_contestant(handle TEXT, birthdate DATE, rating INT,
|
||||
percentile FLOAT, country CHAR(3), achievements TEXT[])
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/test_contestant.cstore');
|
||||
SERVER cstore_server;
|
||||
-- load table data from file
|
||||
COPY test_contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
|
||||
-- export using COPY table TO ...
|
||||
|
|
|
@ -7,34 +7,28 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
|||
-- Validator tests
|
||||
CREATE FOREIGN TABLE test_validator_invalid_option ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', bad_option_name '1'); -- ERROR
|
||||
OPTIONS(bad_option_name '1'); -- ERROR
|
||||
ERROR: invalid option "bad_option_name"
|
||||
HINT: Valid options in this context are: filename, compression, stripe_row_count, block_row_count
|
||||
HINT: Valid options in this context are: compression, stripe_row_count, block_row_count
|
||||
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', stripe_row_count '0'); -- ERROR
|
||||
OPTIONS(stripe_row_count '0'); -- ERROR
|
||||
ERROR: invalid stripe row count
|
||||
HINT: Stripe row count must be an integer between 1000 and 10000000
|
||||
CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', block_row_count '0'); -- ERROR
|
||||
OPTIONS(block_row_count '0'); -- ERROR
|
||||
ERROR: invalid block row count
|
||||
HINT: Block row count must be an integer between 1000 and 100000
|
||||
CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'data.cstore', compression 'invalid_compression'); -- ERROR
|
||||
OPTIONS(compression 'invalid_compression'); -- ERROR
|
||||
ERROR: invalid compression type
|
||||
HINT: Valid options are: none, pglz
|
||||
-- Invalid file path test
|
||||
CREATE FOREIGN TABLE test_invalid_file_path ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename 'bad_directory_path/bad_file_path'); --ERROR
|
||||
ERROR: could not open file "bad_directory_path/bad_file_path" for writing: No such file or directory
|
||||
-- Create uncompressed table
|
||||
CREATE FOREIGN TABLE contestant (handle TEXT, birthdate DATE, rating INT,
|
||||
percentile FLOAT, country CHAR(3), achievements TEXT[])
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/contestant.cstore');
|
||||
SERVER cstore_server;
|
||||
-- Create compressed table with automatically determined file path
|
||||
CREATE FOREIGN TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT,
|
||||
percentile FLOAT, country CHAR(3), achievements TEXT[])
|
||||
|
|
|
@ -7,8 +7,7 @@ SET timezone to 'GMT';
|
|||
SET intervalstyle TO 'POSTGRES_VERBOSE';
|
||||
-- Test array types
|
||||
CREATE FOREIGN TABLE test_array_types (int_array int[], bigint_array bigint[],
|
||||
text_array text[]) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/array_types.cstore');
|
||||
text_array text[]) SERVER cstore_server;
|
||||
COPY test_array_types FROM '@abs_srcdir@/data/array_types.csv' WITH CSV;
|
||||
SELECT * FROM test_array_types;
|
||||
int_array | bigint_array | text_array
|
||||
|
@ -21,8 +20,7 @@ SELECT * FROM test_array_types;
|
|||
-- Test date/time types
|
||||
CREATE FOREIGN TABLE test_datetime_types (timestamp timestamp,
|
||||
timestamp_with_timezone timestamp with time zone, date date, time time,
|
||||
interval interval) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/datetime_types.cstore');
|
||||
interval interval) SERVER cstore_server;
|
||||
COPY test_datetime_types FROM '@abs_srcdir@/data/datetime_types.csv' WITH CSV;
|
||||
SELECT * FROM test_datetime_types;
|
||||
timestamp | timestamp_with_timezone | date | time | interval
|
||||
|
@ -35,8 +33,7 @@ SELECT * FROM test_datetime_types;
|
|||
CREATE TYPE enum_type AS ENUM ('a', 'b', 'c');
|
||||
CREATE TYPE composite_type AS (a int, b text);
|
||||
CREATE FOREIGN TABLE test_enum_and_composite_types (enum enum_type,
|
||||
composite composite_type) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/enum_and_composite_types.cstore');
|
||||
composite composite_type) SERVER cstore_server;
|
||||
COPY test_enum_and_composite_types FROM
|
||||
'@abs_srcdir@/data/enum_and_composite_types.csv' WITH CSV;
|
||||
SELECT * FROM test_enum_and_composite_types;
|
||||
|
@ -48,8 +45,7 @@ SELECT * FROM test_enum_and_composite_types;
|
|||
|
||||
-- Test range types
|
||||
CREATE FOREIGN TABLE test_range_types (int4range int4range, int8range int8range,
|
||||
numrange numrange, tsrange tsrange) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/range_types.cstore');
|
||||
numrange numrange, tsrange tsrange) SERVER cstore_server;
|
||||
COPY test_range_types FROM '@abs_srcdir@/data/range_types.csv' WITH CSV;
|
||||
SELECT * FROM test_range_types;
|
||||
int4range | int8range | numrange | tsrange
|
||||
|
@ -60,8 +56,7 @@ SELECT * FROM test_range_types;
|
|||
|
||||
-- Test other types
|
||||
CREATE FOREIGN TABLE test_other_types (bool boolean, bytea bytea, money money,
|
||||
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/other_types.cstore');
|
||||
inet inet, bitstring bit varying(5), uuid uuid, json json) SERVER cstore_server;
|
||||
COPY test_other_types FROM '@abs_srcdir@/data/other_types.csv' WITH CSV;
|
||||
SELECT * FROM test_other_types;
|
||||
bool | bytea | money | inet | bitstring | uuid | json
|
||||
|
@ -72,8 +67,7 @@ SELECT * FROM test_other_types;
|
|||
|
||||
-- Test null values
|
||||
CREATE FOREIGN TABLE test_null_values (a int, b int[], c composite_type)
|
||||
SERVER cstore_server
|
||||
OPTIONS(filename '@abs_srcdir@/data/null_values.cstore');
|
||||
SERVER cstore_server;
|
||||
COPY test_null_values FROM '@abs_srcdir@/data/null_values.csv' WITH CSV;
|
||||
SELECT * FROM test_null_values;
|
||||
a | b | c
|
||||
|
|
28
sql/drop.sql
28
sql/drop.sql
|
@ -15,13 +15,6 @@
|
|||
-- store postgres database oid
|
||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||
|
||||
-- Check that files for the automatically managed table exist in the
|
||||
-- cstore_fdw/{databaseoid} directory.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
|
||||
-- DROP cstore_fdw tables
|
||||
DROP FOREIGN TABLE contestant;
|
||||
DROP FOREIGN TABLE contestant_compressed;
|
||||
|
@ -31,13 +24,6 @@ CREATE SCHEMA test_schema;
|
|||
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
||||
DROP SCHEMA test_schema CASCADE;
|
||||
|
||||
-- Check that the files have been deleted and the directory is empty after the
|
||||
-- DROP table command.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
|
||||
SELECT current_database() datname \gset
|
||||
|
||||
CREATE DATABASE db_to_drop;
|
||||
|
@ -47,17 +33,9 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
|||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset
|
||||
|
||||
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
|
||||
-- should see 2 files, data and footer file for single table
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw/' || :databaseoid);
|
||||
|
||||
-- should see 2 directories 1 for each database, excluding postgres database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
|
||||
DROP EXTENSION cstore_fdw CASCADE;
|
||||
|
||||
-- should only see 1 directory here
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
|
||||
-- test database drop
|
||||
CREATE EXTENSION cstore_fdw;
|
||||
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
||||
|
@ -65,12 +43,6 @@ SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
|||
|
||||
CREATE FOREIGN TABLE test_table(data int) SERVER cstore_server;
|
||||
|
||||
-- should see 2 directories 1 for each database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
|
||||
\c :datname
|
||||
|
||||
DROP DATABASE db_to_drop;
|
||||
|
||||
-- should only see 1 directory for the default database
|
||||
SELECT count(*) FROM pg_ls_dir('cstore_fdw') WHERE pg_ls_dir != :postgres_oid::text;
|
||||
|
|
|
@ -6,13 +6,6 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
|
||||
|
||||
-- Check that files for the automatically managed table exist in the
|
||||
-- cstore_fdw/{databaseoid} directory.
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
|
||||
-- CREATE a cstore_fdw table, fill with some data --
|
||||
CREATE FOREIGN TABLE cstore_truncate_test (a int, b int) SERVER cstore_server;
|
||||
CREATE FOREIGN TABLE cstore_truncate_test_second (a int, b int) SERVER cstore_server;
|
||||
|
@ -39,12 +32,6 @@ SELECT count(*) FROM cstore_truncate_test_compressed;
|
|||
|
||||
SELECT cstore_table_size('cstore_truncate_test_compressed');
|
||||
|
||||
-- make sure data files still present
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
|
||||
INSERT INTO cstore_truncate_test select a, a from generate_series(1, 10) a;
|
||||
INSERT INTO cstore_truncate_test_regular select a, a from generate_series(10, 20) a;
|
||||
INSERT INTO cstore_truncate_test_second select a, a from generate_series(20, 30) a;
|
||||
|
@ -127,9 +114,3 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
|
|||
-- cleanup
|
||||
DROP SCHEMA truncate_schema CASCADE;
|
||||
DROP USER truncate_user;
|
||||
|
||||
-- verify files are removed
|
||||
SELECT count(*) FROM (
|
||||
SELECT pg_ls_dir('cstore_fdw/' || databaseoid ) FROM (
|
||||
SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database()
|
||||
) AS q1) AS q2;
|
||||
|
|
Loading…
Reference in New Issue