mirror of https://github.com/citusdata/citus.git
commit
e5a3bd18ae
26
cstore.c
26
cstore.c
|
@ -99,29 +99,3 @@ ParseCompressionType(const char *compressionTypeString)
|
||||||
|
|
||||||
return compressionType;
|
return compressionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InitializeCStoreTableFile creates data and footer file for a cstore table.
|
|
||||||
* The function assumes data and footer files do not exist, therefore
|
|
||||||
* it should be called on empty or non-existing table. Notice that the caller
|
|
||||||
* is expected to acquire AccessExclusiveLock on the relation.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *cstoreOptions)
|
|
||||||
{
|
|
||||||
TableWriteState *writeState = NULL;
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
|
||||||
|
|
||||||
InitCStoreTableMetadata(relationId, cstoreOptions->blockRowCount);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Initialize state to write to the cstore file. This creates an
|
|
||||||
* empty data file and a valid footer file for the table.
|
|
||||||
*/
|
|
||||||
writeState = CStoreBeginWrite(relationId,
|
|
||||||
cstoreOptions->compressionType,
|
|
||||||
cstoreOptions->stripeRowCount,
|
|
||||||
cstoreOptions->blockRowCount, tupleDescriptor);
|
|
||||||
CStoreEndWrite(writeState);
|
|
||||||
}
|
|
||||||
|
|
33
cstore.h
33
cstore.h
|
@ -16,7 +16,9 @@
|
||||||
|
|
||||||
#include "fmgr.h"
|
#include "fmgr.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
#include "storage/bufpage.h"
|
#include "storage/bufpage.h"
|
||||||
|
#include "storage/lockdefs.h"
|
||||||
#include "utils/relcache.h"
|
#include "utils/relcache.h"
|
||||||
|
|
||||||
/* Defines for valid option names */
|
/* Defines for valid option names */
|
||||||
|
@ -85,12 +87,12 @@ typedef struct StripeMetadata
|
||||||
} StripeMetadata;
|
} StripeMetadata;
|
||||||
|
|
||||||
|
|
||||||
/* TableMetadata represents the metadata of a cstore file. */
|
/* DataFileMetadata represents the metadata of a cstore file. */
|
||||||
typedef struct TableMetadata
|
typedef struct DataFileMetadata
|
||||||
{
|
{
|
||||||
List *stripeMetadataList;
|
List *stripeMetadataList;
|
||||||
uint64 blockRowCount;
|
uint64 blockRowCount;
|
||||||
} TableMetadata;
|
} DataFileMetadata;
|
||||||
|
|
||||||
|
|
||||||
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
|
/* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
|
||||||
|
@ -190,9 +192,7 @@ typedef struct StripeBuffers
|
||||||
/* TableReadState represents state of a cstore file read operation. */
|
/* TableReadState represents state of a cstore file read operation. */
|
||||||
typedef struct TableReadState
|
typedef struct TableReadState
|
||||||
{
|
{
|
||||||
Oid relationId;
|
DataFileMetadata *datafileMetadata;
|
||||||
|
|
||||||
TableMetadata *tableMetadata;
|
|
||||||
StripeMetadata *currentStripeMetadata;
|
StripeMetadata *currentStripeMetadata;
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
Relation relation;
|
Relation relation;
|
||||||
|
@ -217,8 +217,7 @@ typedef struct TableReadState
|
||||||
/* TableWriteState represents state of a cstore file write operation. */
|
/* TableWriteState represents state of a cstore file write operation. */
|
||||||
typedef struct TableWriteState
|
typedef struct TableWriteState
|
||||||
{
|
{
|
||||||
Oid relationId;
|
DataFileMetadata *datafileMetadata;
|
||||||
TableMetadata *tableMetadata;
|
|
||||||
CompressionType compressionType;
|
CompressionType compressionType;
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
FmgrInfo **comparisonFunctionArray;
|
FmgrInfo **comparisonFunctionArray;
|
||||||
|
@ -249,11 +248,9 @@ extern int cstore_block_row_count;
|
||||||
extern void cstore_init(void);
|
extern void cstore_init(void);
|
||||||
|
|
||||||
extern CompressionType ParseCompressionType(const char *compressionTypeString);
|
extern CompressionType ParseCompressionType(const char *compressionTypeString);
|
||||||
extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
|
|
||||||
CStoreOptions *cstoreOptions);
|
|
||||||
|
|
||||||
/* Function declarations for writing to a cstore file */
|
/* Function declarations for writing to a cstore file */
|
||||||
extern TableWriteState * CStoreBeginWrite(Oid relationId,
|
extern TableWriteState * CStoreBeginWrite(Relation relation,
|
||||||
CompressionType compressionType,
|
CompressionType compressionType,
|
||||||
uint64 stripeMaxRowCount,
|
uint64 stripeMaxRowCount,
|
||||||
uint32 blockRowCount,
|
uint32 blockRowCount,
|
||||||
|
@ -263,7 +260,7 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
|
||||||
extern void CStoreEndWrite(TableWriteState *state);
|
extern void CStoreEndWrite(TableWriteState *state);
|
||||||
|
|
||||||
/* Function declarations for reading from a cstore file */
|
/* Function declarations for reading from a cstore file */
|
||||||
extern TableReadState * CStoreBeginRead(Oid relationId,
|
extern TableReadState * CStoreBeginRead(Relation relation,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *qualConditions);
|
List *projectedColumnList, List *qualConditions);
|
||||||
extern bool CStoreReadFinished(TableReadState *state);
|
extern bool CStoreReadFinished(TableReadState *state);
|
||||||
|
@ -283,12 +280,14 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
||||||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
||||||
|
|
||||||
/* cstore_metadata_tables.c */
|
/* cstore_metadata_tables.c */
|
||||||
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
|
||||||
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount);
|
||||||
extern TableMetadata * ReadTableMetadata(Oid relid);
|
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
|
||||||
extern void SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode);
|
||||||
|
extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||||
|
StripeSkipList *stripeSkipList,
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe,
|
extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
uint32 blockCount);
|
uint32 blockCount);
|
||||||
|
|
||||||
|
|
|
@ -31,18 +31,18 @@ RETURNS bigint
|
||||||
AS 'MODULE_PATHNAME'
|
AS 'MODULE_PATHNAME'
|
||||||
LANGUAGE C STRICT;
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
CREATE TABLE cstore_tables (
|
CREATE TABLE cstore_data_files (
|
||||||
relid oid NOT NULL,
|
relfilenode oid NOT NULL,
|
||||||
block_row_count int NOT NULL,
|
block_row_count int NOT NULL,
|
||||||
version_major bigint NOT NULL,
|
version_major bigint NOT NULL,
|
||||||
version_minor bigint NOT NULL,
|
version_minor bigint NOT NULL,
|
||||||
PRIMARY KEY (relid)
|
PRIMARY KEY (relfilenode)
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
COMMENT ON TABLE cstore_tables IS 'CStore table wide metadata';
|
COMMENT ON TABLE cstore_data_files IS 'CStore data file wide metadata';
|
||||||
|
|
||||||
CREATE TABLE cstore_stripes (
|
CREATE TABLE cstore_stripes (
|
||||||
relid oid NOT NULL,
|
relfilenode oid NOT NULL,
|
||||||
stripe bigint NOT NULL,
|
stripe bigint NOT NULL,
|
||||||
file_offset bigint NOT NULL,
|
file_offset bigint NOT NULL,
|
||||||
data_length bigint NOT NULL,
|
data_length bigint NOT NULL,
|
||||||
|
@ -50,14 +50,14 @@ CREATE TABLE cstore_stripes (
|
||||||
block_count int NOT NULL,
|
block_count int NOT NULL,
|
||||||
block_row_count int NOT NULL,
|
block_row_count int NOT NULL,
|
||||||
row_count bigint NOT NULL,
|
row_count bigint NOT NULL,
|
||||||
PRIMARY KEY (relid, stripe),
|
PRIMARY KEY (relfilenode, stripe),
|
||||||
FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
|
FOREIGN KEY (relfilenode) REFERENCES cstore_data_files(relfilenode) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata';
|
COMMENT ON TABLE cstore_stripes IS 'CStore per stripe metadata';
|
||||||
|
|
||||||
CREATE TABLE cstore_skipnodes (
|
CREATE TABLE cstore_skipnodes (
|
||||||
relid oid NOT NULL,
|
relfilenode oid NOT NULL,
|
||||||
stripe bigint NOT NULL,
|
stripe bigint NOT NULL,
|
||||||
attr int NOT NULL,
|
attr int NOT NULL,
|
||||||
block int NOT NULL,
|
block int NOT NULL,
|
||||||
|
@ -69,8 +69,8 @@ CREATE TABLE cstore_skipnodes (
|
||||||
exists_stream_offset bigint NOT NULL,
|
exists_stream_offset bigint NOT NULL,
|
||||||
exists_stream_length bigint NOT NULL,
|
exists_stream_length bigint NOT NULL,
|
||||||
value_compression_type int NOT NULL,
|
value_compression_type int NOT NULL,
|
||||||
PRIMARY KEY (relid, stripe, attr, block),
|
PRIMARY KEY (relfilenode, stripe, attr, block),
|
||||||
FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
FOREIGN KEY (relfilenode, stripe) REFERENCES cstore_stripes(relfilenode, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
||||||
) WITH (user_catalog_table = true);
|
) WITH (user_catalog_table = true);
|
||||||
|
|
||||||
COMMENT ON TABLE cstore_tables IS 'CStore per block metadata';
|
COMMENT ON TABLE cstore_skipnodes IS 'CStore per block metadata';
|
||||||
|
|
180
cstore_fdw.c
180
cstore_fdw.c
|
@ -25,6 +25,7 @@
|
||||||
#include "catalog/catalog.h"
|
#include "catalog/catalog.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
#include "catalog/objectaccess.h"
|
||||||
#include "catalog/pg_foreign_table.h"
|
#include "catalog/pg_foreign_table.h"
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/storage.h"
|
#include "catalog/storage.h"
|
||||||
|
@ -54,6 +55,7 @@
|
||||||
#include "parser/parser.h"
|
#include "parser/parser.h"
|
||||||
#include "parser/parse_coerce.h"
|
#include "parser/parse_coerce.h"
|
||||||
#include "parser/parse_type.h"
|
#include "parser/parse_type.h"
|
||||||
|
#include "storage/lmgr.h"
|
||||||
#include "storage/smgr.h"
|
#include "storage/smgr.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -105,6 +107,8 @@ static const CStoreValidOption ValidOptionArray[] =
|
||||||
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
|
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static object_access_hook_type prevObjectAccessHook = NULL;
|
||||||
|
|
||||||
/* local functions forward declarations */
|
/* local functions forward declarations */
|
||||||
#if PG_VERSION_NUM >= 100000
|
#if PG_VERSION_NUM >= 100000
|
||||||
static void CStoreProcessUtility(PlannedStmt *plannedStatement, const char *queryString,
|
static void CStoreProcessUtility(PlannedStmt *plannedStatement, const char *queryString,
|
||||||
|
@ -126,13 +130,12 @@ static uint64 CopyIntoCStoreTable(const CopyStmt *copyStatement,
|
||||||
const char *queryString);
|
const char *queryString);
|
||||||
static uint64 CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString);
|
static uint64 CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString);
|
||||||
static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
|
static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
|
||||||
static List * DroppedCStoreRelidList(DropStmt *dropStatement);
|
|
||||||
static List * FindCStoreTables(List *tableList);
|
static List * FindCStoreTables(List *tableList);
|
||||||
static List * OpenRelationsForTruncate(List *cstoreTableList);
|
static List * OpenRelationsForTruncate(List *cstoreTableList);
|
||||||
static void FdwNewRelFileNode(Relation relation);
|
static void FdwNewRelFileNode(Relation relation);
|
||||||
static void TruncateCStoreTables(List *cstoreRelationList);
|
static void TruncateCStoreTables(List *cstoreRelationList);
|
||||||
static bool CStoreTable(Oid relationId);
|
static bool IsCStoreFdwTable(Oid relationId);
|
||||||
static bool CStoreServer(ForeignServer *server);
|
static bool IsCStoreServer(ForeignServer *server);
|
||||||
static bool DistributedTable(Oid relationId);
|
static bool DistributedTable(Oid relationId);
|
||||||
static bool DistributedWorkerCopy(CopyStmt *copyStatement);
|
static bool DistributedWorkerCopy(CopyStmt *copyStatement);
|
||||||
static StringInfo OptionNamesString(Oid currentContextId);
|
static StringInfo OptionNamesString(Oid currentContextId);
|
||||||
|
@ -191,6 +194,9 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
|
||||||
static void cstore_fdw_initrel(Relation rel);
|
static void cstore_fdw_initrel(Relation rel);
|
||||||
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
|
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
|
||||||
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
|
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
|
||||||
|
static void CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
|
||||||
|
int subId,
|
||||||
|
void *arg);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
|
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
|
||||||
PG_FUNCTION_INFO_V1(cstore_table_size);
|
PG_FUNCTION_INFO_V1(cstore_table_size);
|
||||||
|
@ -212,6 +218,8 @@ cstore_fdw_init()
|
||||||
{
|
{
|
||||||
PreviousProcessUtilityHook = ProcessUtility_hook;
|
PreviousProcessUtilityHook = ProcessUtility_hook;
|
||||||
ProcessUtility_hook = CStoreProcessUtility;
|
ProcessUtility_hook = CStoreProcessUtility;
|
||||||
|
prevObjectAccessHook = object_access_hook;
|
||||||
|
object_access_hook = CStoreFdwObjectAccessHook;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -254,20 +262,13 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
bool missingOK = false;
|
bool missingOK = false;
|
||||||
ForeignServer *server = GetForeignServerByName(serverName, missingOK);
|
ForeignServer *server = GetForeignServerByName(serverName, missingOK);
|
||||||
if (CStoreServer(server))
|
if (IsCStoreServer(server))
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(createStatement->base.relation,
|
Oid relationId = RangeVarGetRelid(createStatement->base.relation,
|
||||||
AccessShareLock, false);
|
AccessShareLock, false);
|
||||||
Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock);
|
Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock);
|
||||||
|
CStoreOptions *options = CStoreGetOptions(relationId);
|
||||||
/*
|
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount);
|
||||||
* Make sure database directory exists before creating a table.
|
|
||||||
* This is necessary when a foreign server is created inside
|
|
||||||
* a template database and a new database is created out of it.
|
|
||||||
* We have no chance to hook into server creation to create data
|
|
||||||
* directory for it during database creation time.
|
|
||||||
*/
|
|
||||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
|
||||||
heap_close(relation, AccessExclusiveLock);
|
heap_close(relation, AccessExclusiveLock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,25 +317,6 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
||||||
destReceiver, completionTag);
|
destReceiver, completionTag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (nodeTag(parseTree) == T_DropStmt)
|
|
||||||
{
|
|
||||||
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
|
|
||||||
ListCell *lc = NULL;
|
|
||||||
|
|
||||||
/* drop smgr storage */
|
|
||||||
foreach(lc, dropRelids)
|
|
||||||
{
|
|
||||||
Oid relid = lfirst_oid(lc);
|
|
||||||
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
|
|
||||||
|
|
||||||
RelationOpenSmgr(relation);
|
|
||||||
RelationDropStorage(relation);
|
|
||||||
heap_close(relation, AccessExclusiveLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
|
||||||
destReceiver, completionTag);
|
|
||||||
}
|
|
||||||
else if (nodeTag(parseTree) == T_TruncateStmt)
|
else if (nodeTag(parseTree) == T_TruncateStmt)
|
||||||
{
|
{
|
||||||
TruncateStmt *truncateStatement = (TruncateStmt *) parseTree;
|
TruncateStmt *truncateStatement = (TruncateStmt *) parseTree;
|
||||||
|
@ -403,7 +385,7 @@ CopyCStoreTableStatement(CopyStmt *copyStatement)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(copyStatement->relation,
|
Oid relationId = RangeVarGetRelid(copyStatement->relation,
|
||||||
AccessShareLock, true);
|
AccessShareLock, true);
|
||||||
bool cstoreTable = CStoreTable(relationId);
|
bool cstoreTable = IsCStoreFdwTable(relationId);
|
||||||
if (cstoreTable)
|
if (cstoreTable)
|
||||||
{
|
{
|
||||||
bool distributedTable = DistributedTable(relationId);
|
bool distributedTable = DistributedTable(relationId);
|
||||||
|
@ -558,12 +540,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* init state to write to the cstore file */
|
/* init state to write to the cstore file */
|
||||||
writeState = CStoreBeginWrite(relationId,
|
writeState = CStoreBeginWrite(relation,
|
||||||
cstoreOptions->compressionType,
|
cstoreOptions->compressionType,
|
||||||
cstoreOptions->stripeRowCount,
|
cstoreOptions->stripeRowCount,
|
||||||
cstoreOptions->blockRowCount,
|
cstoreOptions->blockRowCount,
|
||||||
tupleDescriptor);
|
tupleDescriptor);
|
||||||
writeState->relation = relation;
|
|
||||||
|
|
||||||
while (nextRowFound)
|
while (nextRowFound)
|
||||||
{
|
{
|
||||||
|
@ -686,7 +667,7 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true);
|
relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true);
|
||||||
if (!CStoreTable(relationId))
|
if (!IsCStoreFdwTable(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -725,36 +706,6 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DropppedCStoreRelidList extracts and returns the list of cstore relids
|
|
||||||
* from DROP table statement
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
DroppedCStoreRelidList(DropStmt *dropStatement)
|
|
||||||
{
|
|
||||||
List *droppedCStoreRelidList = NIL;
|
|
||||||
|
|
||||||
if (dropStatement->removeType == OBJECT_FOREIGN_TABLE)
|
|
||||||
{
|
|
||||||
ListCell *dropObjectCell = NULL;
|
|
||||||
foreach(dropObjectCell, dropStatement->objects)
|
|
||||||
{
|
|
||||||
List *tableNameList = (List *) lfirst(dropObjectCell);
|
|
||||||
RangeVar *rangeVar = makeRangeVarFromNameList(tableNameList);
|
|
||||||
|
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
|
|
||||||
if (CStoreTable(relationId))
|
|
||||||
{
|
|
||||||
droppedCStoreRelidList = lappend_oid(droppedCStoreRelidList,
|
|
||||||
relationId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return droppedCStoreRelidList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* FindCStoreTables returns list of CStore tables from given table list */
|
/* FindCStoreTables returns list of CStore tables from given table list */
|
||||||
static List *
|
static List *
|
||||||
FindCStoreTables(List *tableList)
|
FindCStoreTables(List *tableList)
|
||||||
|
@ -765,7 +716,7 @@ FindCStoreTables(List *tableList)
|
||||||
{
|
{
|
||||||
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
|
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
|
||||||
if (CStoreTable(relationId) && !DistributedTable(relationId))
|
if (IsCStoreFdwTable(relationId) && !DistributedTable(relationId))
|
||||||
{
|
{
|
||||||
cstoreTableList = lappend(cstoreTableList, rangeVar);
|
cstoreTableList = lappend(cstoreTableList, rangeVar);
|
||||||
}
|
}
|
||||||
|
@ -824,11 +775,12 @@ TruncateCStoreTables(List *cstoreRelationList)
|
||||||
{
|
{
|
||||||
Relation relation = (Relation) lfirst(relationCell);
|
Relation relation = (Relation) lfirst(relationCell);
|
||||||
Oid relationId = relation->rd_id;
|
Oid relationId = relation->rd_id;
|
||||||
|
CStoreOptions *options = CStoreGetOptions(relationId);
|
||||||
|
|
||||||
Assert(CStoreTable(relationId));
|
Assert(IsCStoreFdwTable(relationId));
|
||||||
|
|
||||||
FdwNewRelFileNode(relation);
|
FdwNewRelFileNode(relation);
|
||||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
InitCStoreDataFileMetadata(relation->rd_node.relNode, options->blockRowCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -861,7 +813,6 @@ FdwNewRelFileNode(Relation relation)
|
||||||
Relation tmprel;
|
Relation tmprel;
|
||||||
Oid tablespace;
|
Oid tablespace;
|
||||||
Oid filenode;
|
Oid filenode;
|
||||||
RelFileNode newrnode;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
||||||
|
@ -887,10 +838,6 @@ FdwNewRelFileNode(Relation relation)
|
||||||
|
|
||||||
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||||
|
|
||||||
newrnode.spcNode = tablespace;
|
|
||||||
newrnode.dbNode = MyDatabaseId;
|
|
||||||
newrnode.relNode = filenode;
|
|
||||||
|
|
||||||
classform->relfilenode = filenode;
|
classform->relfilenode = filenode;
|
||||||
classform->relpages = 0; /* it's empty until further notice */
|
classform->relpages = 0; /* it's empty until further notice */
|
||||||
classform->reltuples = 0;
|
classform->reltuples = 0;
|
||||||
|
@ -900,6 +847,10 @@ FdwNewRelFileNode(Relation relation)
|
||||||
|
|
||||||
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
|
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
relation->rd_node.spcNode = tablespace;
|
||||||
|
relation->rd_node.dbNode = MyDatabaseId;
|
||||||
|
relation->rd_node.relNode = filenode;
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_freetuple(tuple);
|
heap_freetuple(tuple);
|
||||||
|
@ -928,11 +879,11 @@ FdwCreateStorage(Relation relation)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreTable checks if the given table name belongs to a foreign columnar store
|
* IsCStoreFdwTable checks if the given table name belongs to a foreign columnar store
|
||||||
* table. If it does, the function returns true. Otherwise, it returns false.
|
* table. If it does, the function returns true. Otherwise, it returns false.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
CStoreTable(Oid relationId)
|
IsCStoreFdwTable(Oid relationId)
|
||||||
{
|
{
|
||||||
bool cstoreTable = false;
|
bool cstoreTable = false;
|
||||||
char relationKind = 0;
|
char relationKind = 0;
|
||||||
|
@ -947,7 +898,7 @@ CStoreTable(Oid relationId)
|
||||||
{
|
{
|
||||||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||||
ForeignServer *server = GetForeignServer(foreignTable->serverid);
|
ForeignServer *server = GetForeignServer(foreignTable->serverid);
|
||||||
if (CStoreServer(server))
|
if (IsCStoreServer(server))
|
||||||
{
|
{
|
||||||
cstoreTable = true;
|
cstoreTable = true;
|
||||||
}
|
}
|
||||||
|
@ -958,11 +909,11 @@ CStoreTable(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreServer checks if the given foreign server belongs to cstore_fdw. If it
|
* IsCStoreServer checks if the given foreign server belongs to cstore_fdw. If it
|
||||||
* does, the function returns true. Otherwise, it returns false.
|
* does, the function returns true. Otherwise, it returns false.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
CStoreServer(ForeignServer *server)
|
IsCStoreServer(ForeignServer *server)
|
||||||
{
|
{
|
||||||
ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid);
|
ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid);
|
||||||
bool cstoreServer = false;
|
bool cstoreServer = false;
|
||||||
|
@ -1055,7 +1006,7 @@ Datum
|
||||||
cstore_table_size(PG_FUNCTION_ARGS)
|
cstore_table_size(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
bool cstoreTable = CStoreTable(relationId);
|
bool cstoreTable = IsCStoreFdwTable(relationId);
|
||||||
Relation relation;
|
Relation relation;
|
||||||
BlockNumber nblocks;
|
BlockNumber nblocks;
|
||||||
|
|
||||||
|
@ -1705,6 +1656,7 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
||||||
ForeignScan *foreignScan = NULL;
|
ForeignScan *foreignScan = NULL;
|
||||||
List *foreignPrivateList = NIL;
|
List *foreignPrivateList = NIL;
|
||||||
List *whereClauseList = NIL;
|
List *whereClauseList = NIL;
|
||||||
|
Relation relation = NULL;
|
||||||
|
|
||||||
cstore_fdw_initrel(currentRelation);
|
cstore_fdw_initrel(currentRelation);
|
||||||
|
|
||||||
|
@ -1721,9 +1673,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
||||||
whereClauseList = foreignScan->scan.plan.qual;
|
whereClauseList = foreignScan->scan.plan.qual;
|
||||||
|
|
||||||
columnList = (List *) linitial(foreignPrivateList);
|
columnList = (List *) linitial(foreignPrivateList);
|
||||||
readState = CStoreBeginRead(foreignTableId,
|
relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||||
tupleDescriptor, columnList, whereClauseList);
|
readState = CStoreBeginRead(relation, tupleDescriptor, columnList, whereClauseList);
|
||||||
readState->relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
|
||||||
|
|
||||||
scanState->fdw_state = (void *) readState;
|
scanState->fdw_state = (void *) readState;
|
||||||
}
|
}
|
||||||
|
@ -2067,13 +2018,12 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
|
||||||
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||||
|
|
||||||
writeState = CStoreBeginWrite(foreignTableOid,
|
writeState = CStoreBeginWrite(relation,
|
||||||
cstoreOptions->compressionType,
|
cstoreOptions->compressionType,
|
||||||
cstoreOptions->stripeRowCount,
|
cstoreOptions->stripeRowCount,
|
||||||
cstoreOptions->blockRowCount,
|
cstoreOptions->blockRowCount,
|
||||||
tupleDescriptor);
|
tupleDescriptor);
|
||||||
|
|
||||||
writeState->relation = relation;
|
|
||||||
relationInfo->ri_FdwState = (void *) writeState;
|
relationInfo->ri_FdwState = (void *) writeState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2216,3 +2166,61 @@ cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
|
||||||
|
|
||||||
return rel;
|
return rel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Implements object_access_hook. One of the places this is called is just
|
||||||
|
* before dropping an object, which allows us to clean-up resources for
|
||||||
|
* cstore tables.
|
||||||
|
*
|
||||||
|
* When cleaning up resources, we need to have access to the pg_class record
|
||||||
|
* for the table so we can indentify the relfilenode belonging to the relation.
|
||||||
|
* We don't have access to this information in sql_drop event triggers, since
|
||||||
|
* the relation has already been dropped there. object_access_hook is called
|
||||||
|
* __before__ dropping tables, so we still have access to the pg_class
|
||||||
|
* entry here.
|
||||||
|
*
|
||||||
|
* Note that the utility hook is called once per __command__, and not for
|
||||||
|
* every object dropped, and since a drop can cascade to other objects, it
|
||||||
|
* is difficult to get full set of dropped objects in the utility hook.
|
||||||
|
* But object_access_hook is called once per dropped object, so it is
|
||||||
|
* much easier to clean-up all dropped objects here.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
|
||||||
|
int subId, void *arg)
|
||||||
|
{
|
||||||
|
if (prevObjectAccessHook)
|
||||||
|
{
|
||||||
|
prevObjectAccessHook(access, classId, objectId, subId, arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Do nothing if this is not a DROP relation command.
|
||||||
|
*/
|
||||||
|
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Lock relation to prevent it from being dropped and to avoid
|
||||||
|
* race conditions in the next if block.
|
||||||
|
*/
|
||||||
|
LockRelationOid(objectId, AccessShareLock);
|
||||||
|
|
||||||
|
if (IsCStoreFdwTable(objectId))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Drop both metadata and storage. We need to drop storage here since
|
||||||
|
* we manage relfilenode for FDW tables in the extension.
|
||||||
|
*/
|
||||||
|
Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock);
|
||||||
|
RelationOpenSmgr(rel);
|
||||||
|
RelationDropStorage(rel);
|
||||||
|
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
|
||||||
|
|
||||||
|
/* keep the lock since we did physical changes to the relation */
|
||||||
|
relation_close(rel, NoLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -45,13 +45,12 @@ typedef struct
|
||||||
|
|
||||||
static Oid CStoreStripesRelationId(void);
|
static Oid CStoreStripesRelationId(void);
|
||||||
static Oid CStoreStripesIndexRelationId(void);
|
static Oid CStoreStripesIndexRelationId(void);
|
||||||
static Oid CStoreTablesRelationId(void);
|
static Oid CStoreDataFilesRelationId(void);
|
||||||
static Oid CStoreTablesIndexRelationId(void);
|
static Oid CStoreDataFilesIndexRelationId(void);
|
||||||
static Oid CStoreSkipNodesRelationId(void);
|
static Oid CStoreSkipNodesRelationId(void);
|
||||||
static Oid CStoreSkipNodesIndexRelationId(void);
|
static Oid CStoreSkipNodesIndexRelationId(void);
|
||||||
static Oid CStoreNamespaceId(void);
|
static Oid CStoreNamespaceId(void);
|
||||||
static int TableBlockRowCount(Oid relid);
|
static bool ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount);
|
||||||
static void DeleteTableMetadataRowIfExists(Oid relid);
|
|
||||||
static ModifyState * StartModifyRelation(Relation rel);
|
static ModifyState * StartModifyRelation(Relation rel);
|
||||||
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||||
bool *nulls);
|
bool *nulls);
|
||||||
|
@ -62,15 +61,15 @@ static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
|
||||||
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||||
|
|
||||||
/* constants for cstore_table */
|
/* constants for cstore_table */
|
||||||
#define Natts_cstore_tables 4
|
#define Natts_cstore_data_files 4
|
||||||
#define Anum_cstore_tables_relid 1
|
#define Anum_cstore_data_files_relfilenode 1
|
||||||
#define Anum_cstore_tables_block_row_count 2
|
#define Anum_cstore_data_files_block_row_count 2
|
||||||
#define Anum_cstore_tables_version_major 3
|
#define Anum_cstore_data_files_version_major 3
|
||||||
#define Anum_cstore_tables_version_minor 4
|
#define Anum_cstore_data_files_version_minor 4
|
||||||
|
|
||||||
/* constants for cstore_stripe */
|
/* constants for cstore_stripe */
|
||||||
#define Natts_cstore_stripes 8
|
#define Natts_cstore_stripes 8
|
||||||
#define Anum_cstore_stripes_relid 1
|
#define Anum_cstore_stripes_relfilenode 1
|
||||||
#define Anum_cstore_stripes_stripe 2
|
#define Anum_cstore_stripes_stripe 2
|
||||||
#define Anum_cstore_stripes_file_offset 3
|
#define Anum_cstore_stripes_file_offset 3
|
||||||
#define Anum_cstore_stripes_data_length 4
|
#define Anum_cstore_stripes_data_length 4
|
||||||
|
@ -81,7 +80,7 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||||
|
|
||||||
/* constants for cstore_skipnodes */
|
/* constants for cstore_skipnodes */
|
||||||
#define Natts_cstore_skipnodes 12
|
#define Natts_cstore_skipnodes 12
|
||||||
#define Anum_cstore_skipnodes_relid 1
|
#define Anum_cstore_skipnodes_relfilenode 1
|
||||||
#define Anum_cstore_skipnodes_stripe 2
|
#define Anum_cstore_skipnodes_stripe 2
|
||||||
#define Anum_cstore_skipnodes_attr 3
|
#define Anum_cstore_skipnodes_attr 3
|
||||||
#define Anum_cstore_skipnodes_block 4
|
#define Anum_cstore_skipnodes_block 4
|
||||||
|
@ -96,35 +95,36 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
|
* InitCStoreDataFileMetadata adds a record for the given relfilenode
|
||||||
|
* in cstore_data_files.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount)
|
||||||
{
|
{
|
||||||
Oid cstoreTablesOid = InvalidOid;
|
Oid cstoreDataFilesOid = InvalidOid;
|
||||||
Relation cstoreTables = NULL;
|
Relation cstoreDataFiles = NULL;
|
||||||
ModifyState *modifyState = NULL;
|
ModifyState *modifyState = NULL;
|
||||||
|
|
||||||
bool nulls[Natts_cstore_tables] = { 0 };
|
bool nulls[Natts_cstore_data_files] = { 0 };
|
||||||
Datum values[Natts_cstore_tables] = {
|
Datum values[Natts_cstore_data_files] = {
|
||||||
ObjectIdGetDatum(relid),
|
ObjectIdGetDatum(relfilenode),
|
||||||
Int32GetDatum(blockRowCount),
|
Int32GetDatum(blockRowCount),
|
||||||
Int32GetDatum(CSTORE_VERSION_MAJOR),
|
Int32GetDatum(CSTORE_VERSION_MAJOR),
|
||||||
Int32GetDatum(CSTORE_VERSION_MINOR)
|
Int32GetDatum(CSTORE_VERSION_MINOR)
|
||||||
};
|
};
|
||||||
|
|
||||||
DeleteTableMetadataRowIfExists(relid);
|
DeleteDataFileMetadataRowIfExists(relfilenode);
|
||||||
|
|
||||||
cstoreTablesOid = CStoreTablesRelationId();
|
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||||
cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock);
|
cstoreDataFiles = heap_open(cstoreDataFilesOid, RowExclusiveLock);
|
||||||
|
|
||||||
modifyState = StartModifyRelation(cstoreTables);
|
modifyState = StartModifyRelation(cstoreDataFiles);
|
||||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
heap_close(cstoreTables, NoLock);
|
heap_close(cstoreDataFiles, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
||||||
* of cstore_skipnodes.
|
* of cstore_skipnodes.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||||
TupleDesc tupleDescriptor)
|
TupleDesc tupleDescriptor)
|
||||||
{
|
{
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
|
@ -155,7 +155,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||||
&stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
|
&stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||||
|
|
||||||
Datum values[Natts_cstore_skipnodes] = {
|
Datum values[Natts_cstore_skipnodes] = {
|
||||||
ObjectIdGetDatum(relid),
|
ObjectIdGetDatum(relfilenode),
|
||||||
Int64GetDatum(stripe),
|
Int64GetDatum(stripe),
|
||||||
Int32GetDatum(columnIndex + 1),
|
Int32GetDatum(columnIndex + 1),
|
||||||
Int32GetDatum(blockIndex),
|
Int32GetDatum(blockIndex),
|
||||||
|
@ -201,7 +201,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||||
* ReadStripeSkipList fetches StripeSkipList for a given stripe.
|
* ReadStripeSkipList fetches StripeSkipList for a given stripe.
|
||||||
*/
|
*/
|
||||||
StripeSkipList *
|
StripeSkipList *
|
||||||
ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
||||||
uint32 blockCount)
|
uint32 blockCount)
|
||||||
{
|
{
|
||||||
StripeSkipList *skipList = NULL;
|
StripeSkipList *skipList = NULL;
|
||||||
|
@ -218,8 +218,8 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
||||||
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
||||||
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relid,
|
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relfilenode,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||||
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
|
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||||
|
|
||||||
|
@ -311,11 +311,11 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
||||||
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
|
||||||
{
|
{
|
||||||
bool nulls[Natts_cstore_stripes] = { 0 };
|
bool nulls[Natts_cstore_stripes] = { 0 };
|
||||||
Datum values[Natts_cstore_stripes] = {
|
Datum values[Natts_cstore_stripes] = {
|
||||||
ObjectIdGetDatum(relid),
|
ObjectIdGetDatum(relfilenode),
|
||||||
Int64GetDatum(stripe->id),
|
Int64GetDatum(stripe->id),
|
||||||
Int64GetDatum(stripe->fileOffset),
|
Int64GetDatum(stripe->fileOffset),
|
||||||
Int64GetDatum(stripe->dataLength),
|
Int64GetDatum(stripe->dataLength),
|
||||||
|
@ -339,11 +339,11 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReadTableMetadata constructs TableMetadata for a given relid by reading
|
* ReadDataFileMetadata constructs DataFileMetadata for a given relfilenode by reading
|
||||||
* from cstore_tables and cstore_stripes.
|
* from cstore_data_files and cstore_stripes.
|
||||||
*/
|
*/
|
||||||
TableMetadata *
|
DataFileMetadata *
|
||||||
ReadTableMetadata(Oid relid)
|
ReadDataFileMetadata(Oid relfilenode)
|
||||||
{
|
{
|
||||||
Oid cstoreStripesOid = InvalidOid;
|
Oid cstoreStripesOid = InvalidOid;
|
||||||
Relation cstoreStripes = NULL;
|
Relation cstoreStripes = NULL;
|
||||||
|
@ -352,12 +352,18 @@ ReadTableMetadata(Oid relid)
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
HeapTuple heapTuple;
|
HeapTuple heapTuple;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata));
|
DataFileMetadata *datafileMetadata = palloc0(sizeof(DataFileMetadata));
|
||||||
tableMetadata->blockRowCount = TableBlockRowCount(relid);
|
found = ReadCStoreDataFiles(relfilenode, &datafileMetadata->blockRowCount);
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("Relfilenode %d doesn't belong to a cstore table.",
|
||||||
|
relfilenode)));
|
||||||
|
}
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relid,
|
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relfilenode,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||||
|
|
||||||
cstoreStripesOid = CStoreStripesRelationId();
|
cstoreStripesOid = CStoreStripesRelationId();
|
||||||
cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
||||||
|
@ -389,93 +395,120 @@ ReadTableMetadata(Oid relid)
|
||||||
stripeMetadata->rowCount = DatumGetInt64(
|
stripeMetadata->rowCount = DatumGetInt64(
|
||||||
datumArray[Anum_cstore_stripes_row_count - 1]);
|
datumArray[Anum_cstore_stripes_row_count - 1]);
|
||||||
|
|
||||||
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
datafileMetadata->stripeMetadataList = lappend(
|
||||||
stripeMetadata);
|
datafileMetadata->stripeMetadataList,
|
||||||
|
stripeMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, NoLock);
|
||||||
heap_close(cstoreStripes, NoLock);
|
heap_close(cstoreStripes, NoLock);
|
||||||
|
|
||||||
return tableMetadata;
|
return datafileMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TableBlockRowCount returns block_row_count column from cstore_tables for a given relid.
|
* ReadCStoreDataFiles reads corresponding record from cstore_data_files. Returns
|
||||||
|
* false if table was not found in cstore_data_files.
|
||||||
*/
|
*/
|
||||||
static int
|
static bool
|
||||||
TableBlockRowCount(Oid relid)
|
ReadCStoreDataFiles(Oid relfilenode, uint64 *blockRowCount)
|
||||||
{
|
{
|
||||||
int blockRowCount = 0;
|
bool found = false;
|
||||||
Oid cstoreTablesOid = InvalidOid;
|
Oid cstoreDataFilesOid = InvalidOid;
|
||||||
Relation cstoreTables = NULL;
|
Relation cstoreDataFiles = NULL;
|
||||||
Relation index = NULL;
|
Relation index = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||||
|
|
||||||
cstoreTablesOid = CStoreTablesRelationId();
|
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||||
cstoreTables = heap_open(cstoreTablesOid, AccessShareLock);
|
cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||||
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
if (cstoreDataFiles == NULL)
|
||||||
tupleDescriptor = RelationGetDescr(cstoreTables);
|
{
|
||||||
|
/*
|
||||||
|
* Extension has been dropped. This can be called while
|
||||||
|
* dropping extension or database via ObjectAccess().
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
index = try_relation_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||||
|
if (index == NULL)
|
||||||
|
{
|
||||||
|
heap_close(cstoreDataFiles, NoLock);
|
||||||
|
|
||||||
|
/* extension has been dropped */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
tupleDescriptor = RelationGetDescr(cstoreDataFiles);
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey);
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
if (HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
Datum datumArray[Natts_cstore_tables];
|
Datum datumArray[Natts_cstore_data_files];
|
||||||
bool isNullArray[Natts_cstore_tables];
|
bool isNullArray[Natts_cstore_data_files];
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - 1]);
|
*blockRowCount = DatumGetInt32(datumArray[Anum_cstore_data_files_block_row_count -
|
||||||
|
1]);
|
||||||
|
found = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, NoLock);
|
||||||
heap_close(cstoreTables, NoLock);
|
heap_close(cstoreDataFiles, NoLock);
|
||||||
|
|
||||||
return blockRowCount;
|
return found;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeleteTableMetadataRowIfExists removes the row with given relid from cstore_stripes.
|
* DeleteDataFileMetadataRowIfExists removes the row with given relfilenode from cstore_stripes.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
DeleteTableMetadataRowIfExists(Oid relid)
|
DeleteDataFileMetadataRowIfExists(Oid relfilenode)
|
||||||
{
|
{
|
||||||
Oid cstoreTablesOid = InvalidOid;
|
Oid cstoreDataFilesOid = InvalidOid;
|
||||||
Relation cstoreTables = NULL;
|
Relation cstoreDataFiles = NULL;
|
||||||
Relation index = NULL;
|
Relation index = NULL;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||||
|
|
||||||
cstoreTablesOid = CStoreTablesRelationId();
|
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||||
cstoreTables = heap_open(cstoreTablesOid, AccessShareLock);
|
cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||||
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
if (cstoreDataFiles == NULL)
|
||||||
|
{
|
||||||
|
/* extension has been dropped */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
index = index_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey);
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
if (HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
ModifyState *modifyState = StartModifyRelation(cstoreTables);
|
ModifyState *modifyState = StartModifyRelation(cstoreDataFiles);
|
||||||
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
|
DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, NoLock);
|
||||||
heap_close(cstoreTables, NoLock);
|
heap_close(cstoreDataFiles, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -680,24 +713,24 @@ CStoreStripesIndexRelationId(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreTablesRelationId returns relation id of cstore_tables.
|
* CStoreDataFilesRelationId returns relation id of cstore_data_files.
|
||||||
* TODO: should we cache this similar to citus?
|
* TODO: should we cache this similar to citus?
|
||||||
*/
|
*/
|
||||||
static Oid
|
static Oid
|
||||||
CStoreTablesRelationId(void)
|
CStoreDataFilesRelationId(void)
|
||||||
{
|
{
|
||||||
return get_relname_relid("cstore_tables", CStoreNamespaceId());
|
return get_relname_relid("cstore_data_files", CStoreNamespaceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CStoreTablesIndexRelationId returns relation id of cstore_tables_idx.
|
* CStoreDataFilesIndexRelationId returns relation id of cstore_data_files_pkey.
|
||||||
* TODO: should we cache this similar to citus?
|
* TODO: should we cache this similar to citus?
|
||||||
*/
|
*/
|
||||||
static Oid
|
static Oid
|
||||||
CStoreTablesIndexRelationId(void)
|
CStoreDataFilesIndexRelationId(void)
|
||||||
{
|
{
|
||||||
return get_relname_relid("cstore_tables_pkey", CStoreNamespaceId());
|
return get_relname_relid("cstore_data_files_pkey", CStoreNamespaceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -80,14 +80,15 @@ static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
||||||
* read handle that's used during reading rows and finishing the read operation.
|
* read handle that's used during reading rows and finishing the read operation.
|
||||||
*/
|
*/
|
||||||
TableReadState *
|
TableReadState *
|
||||||
CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
|
CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *whereClauseList)
|
List *projectedColumnList, List *whereClauseList)
|
||||||
{
|
{
|
||||||
TableReadState *readState = NULL;
|
TableReadState *readState = NULL;
|
||||||
TableMetadata *tableMetadata = NULL;
|
DataFileMetadata *datafileMetadata = NULL;
|
||||||
MemoryContext stripeReadContext = NULL;
|
MemoryContext stripeReadContext = NULL;
|
||||||
|
Oid relNode = relation->rd_node.relNode;
|
||||||
|
|
||||||
tableMetadata = ReadTableMetadata(relationId);
|
datafileMetadata = ReadDataFileMetadata(relNode);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||||
|
@ -99,8 +100,8 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
|
||||||
ALLOCSET_DEFAULT_SIZES);
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
|
||||||
readState = palloc0(sizeof(TableReadState));
|
readState = palloc0(sizeof(TableReadState));
|
||||||
readState->relationId = relationId;
|
readState->relation = relation;
|
||||||
readState->tableMetadata = tableMetadata;
|
readState->datafileMetadata = datafileMetadata;
|
||||||
readState->projectedColumnList = projectedColumnList;
|
readState->projectedColumnList = projectedColumnList;
|
||||||
readState->whereClauseList = whereClauseList;
|
readState->whereClauseList = whereClauseList;
|
||||||
readState->stripeBuffers = NULL;
|
readState->stripeBuffers = NULL;
|
||||||
|
@ -138,7 +139,7 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
||||||
{
|
{
|
||||||
StripeBuffers *stripeBuffers = NULL;
|
StripeBuffers *stripeBuffers = NULL;
|
||||||
StripeMetadata *stripeMetadata = NULL;
|
StripeMetadata *stripeMetadata = NULL;
|
||||||
List *stripeMetadataList = readState->tableMetadata->stripeMetadataList;
|
List *stripeMetadataList = readState->datafileMetadata->stripeMetadataList;
|
||||||
uint32 stripeCount = list_length(stripeMetadataList);
|
uint32 stripeCount = list_length(stripeMetadataList);
|
||||||
|
|
||||||
/* if we have read all stripes, return false */
|
/* if we have read all stripes, return false */
|
||||||
|
@ -228,8 +229,8 @@ void
|
||||||
CStoreEndRead(TableReadState *readState)
|
CStoreEndRead(TableReadState *readState)
|
||||||
{
|
{
|
||||||
MemoryContextDelete(readState->stripeReadContext);
|
MemoryContextDelete(readState->stripeReadContext);
|
||||||
list_free_deep(readState->tableMetadata->stripeMetadataList);
|
list_free_deep(readState->datafileMetadata->stripeMetadataList);
|
||||||
pfree(readState->tableMetadata);
|
pfree(readState->datafileMetadata);
|
||||||
pfree(readState);
|
pfree(readState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,13 +305,13 @@ FreeBlockData(BlockData *blockData)
|
||||||
uint64
|
uint64
|
||||||
CStoreTableRowCount(Relation relation)
|
CStoreTableRowCount(Relation relation)
|
||||||
{
|
{
|
||||||
TableMetadata *tableMetadata = NULL;
|
DataFileMetadata *datafileMetadata = NULL;
|
||||||
ListCell *stripeMetadataCell = NULL;
|
ListCell *stripeMetadataCell = NULL;
|
||||||
uint64 totalRowCount = 0;
|
uint64 totalRowCount = 0;
|
||||||
|
|
||||||
tableMetadata = ReadTableMetadata(relation->rd_id);
|
datafileMetadata = ReadDataFileMetadata(relation->rd_node.relNode);
|
||||||
|
|
||||||
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
foreach(stripeMetadataCell, datafileMetadata->stripeMetadataList)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
|
||||||
totalRowCount += stripeMetadata->rowCount;
|
totalRowCount += stripeMetadata->rowCount;
|
||||||
|
@ -337,7 +338,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
|
|
||||||
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||||
|
|
||||||
StripeSkipList *stripeSkipList = ReadStripeSkipList(RelationGetRelid(relation),
|
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node.relNode,
|
||||||
stripeMetadata->id,
|
stripeMetadata->id,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
stripeMetadata->blockCount);
|
stripeMetadata->blockCount);
|
||||||
|
|
101
cstore_tableam.c
101
cstore_tableam.c
|
@ -14,6 +14,8 @@
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/catalog.h"
|
#include "catalog/catalog.h"
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
|
#include "catalog/objectaccess.h"
|
||||||
|
#include "catalog/pg_am.h"
|
||||||
#include "catalog/storage.h"
|
#include "catalog/storage.h"
|
||||||
#include "catalog/storage_xlog.h"
|
#include "catalog/storage_xlog.h"
|
||||||
#include "commands/progress.h"
|
#include "commands/progress.h"
|
||||||
|
@ -30,10 +32,13 @@
|
||||||
#include "storage/smgr.h"
|
#include "storage/smgr.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
#include "cstore.h"
|
#include "cstore.h"
|
||||||
#include "cstore_tableam.h"
|
#include "cstore_tableam.h"
|
||||||
|
|
||||||
|
#define CSTORE_TABLEAM_NAME "cstore_tableam"
|
||||||
|
|
||||||
typedef struct CStoreScanDescData
|
typedef struct CStoreScanDescData
|
||||||
{
|
{
|
||||||
TableScanDescData cs_base;
|
TableScanDescData cs_base;
|
||||||
|
@ -45,6 +50,13 @@ typedef struct CStoreScanDescData *CStoreScanDesc;
|
||||||
static TableWriteState *CStoreWriteState = NULL;
|
static TableWriteState *CStoreWriteState = NULL;
|
||||||
static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL;
|
static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL;
|
||||||
static MemoryContext CStoreContext = NULL;
|
static MemoryContext CStoreContext = NULL;
|
||||||
|
static object_access_hook_type prevObjectAccessHook = NULL;
|
||||||
|
|
||||||
|
/* forward declaration for static functions */
|
||||||
|
static void CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid
|
||||||
|
objectId, int subId,
|
||||||
|
void *arg);
|
||||||
|
static bool IsCStoreTableAmTable(Oid relationId);
|
||||||
|
|
||||||
static CStoreOptions *
|
static CStoreOptions *
|
||||||
CStoreTableAMGetOptions(void)
|
CStoreTableAMGetOptions(void)
|
||||||
|
@ -97,13 +109,11 @@ cstore_init_write_state(Relation relation)
|
||||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||||
|
|
||||||
elog(LOG, "initializing write state for relation %d", relation->rd_id);
|
elog(LOG, "initializing write state for relation %d", relation->rd_id);
|
||||||
CStoreWriteState = CStoreBeginWrite(relation->rd_id,
|
CStoreWriteState = CStoreBeginWrite(relation,
|
||||||
cstoreOptions->compressionType,
|
cstoreOptions->compressionType,
|
||||||
cstoreOptions->stripeRowCount,
|
cstoreOptions->stripeRowCount,
|
||||||
cstoreOptions->blockRowCount,
|
cstoreOptions->blockRowCount,
|
||||||
tupdesc);
|
tupdesc);
|
||||||
|
|
||||||
CStoreWriteState->relation = relation;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,16 +144,12 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
||||||
ParallelTableScanDesc parallel_scan,
|
ParallelTableScanDesc parallel_scan,
|
||||||
uint32 flags)
|
uint32 flags)
|
||||||
{
|
{
|
||||||
Oid relid = relation->rd_id;
|
|
||||||
TupleDesc tupdesc = relation->rd_att;
|
TupleDesc tupdesc = relation->rd_att;
|
||||||
CStoreOptions *cstoreOptions = NULL;
|
|
||||||
TableReadState *readState = NULL;
|
TableReadState *readState = NULL;
|
||||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||||
List *columnList = NIL;
|
List *columnList = NIL;
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
||||||
|
|
||||||
cstoreOptions = CStoreTableAMGetOptions();
|
|
||||||
|
|
||||||
scan->cs_base.rs_rd = relation;
|
scan->cs_base.rs_rd = relation;
|
||||||
scan->cs_base.rs_snapshot = snapshot;
|
scan->cs_base.rs_snapshot = snapshot;
|
||||||
scan->cs_base.rs_nkeys = nkeys;
|
scan->cs_base.rs_nkeys = nkeys;
|
||||||
|
@ -171,8 +177,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
||||||
columnList = lappend(columnList, var);
|
columnList = lappend(columnList, var);
|
||||||
}
|
}
|
||||||
|
|
||||||
readState = CStoreBeginRead(relid, tupdesc, columnList, NULL);
|
readState = CStoreBeginRead(relation, tupdesc, columnList, NULL);
|
||||||
readState->relation = relation;
|
|
||||||
|
|
||||||
scan->cs_readState = readState;
|
scan->cs_readState = readState;
|
||||||
|
|
||||||
|
@ -438,12 +443,13 @@ cstore_relation_set_new_filenode(Relation rel,
|
||||||
MultiXactId *minmulti)
|
MultiXactId *minmulti)
|
||||||
{
|
{
|
||||||
SMgrRelation srel;
|
SMgrRelation srel;
|
||||||
|
CStoreOptions *options = CStoreTableAMGetOptions();
|
||||||
|
|
||||||
Assert(persistence == RELPERSISTENCE_PERMANENT);
|
Assert(persistence == RELPERSISTENCE_PERMANENT);
|
||||||
*freezeXid = RecentXmin;
|
*freezeXid = RecentXmin;
|
||||||
*minmulti = GetOldestMultiXactId();
|
*minmulti = GetOldestMultiXactId();
|
||||||
srel = RelationCreateStorage(*newrnode, persistence);
|
srel = RelationCreateStorage(*newrnode, persistence);
|
||||||
InitializeCStoreTableFile(rel->rd_id, rel, CStoreTableAMGetOptions());
|
InitCStoreDataFileMetadata(newrnode->relNode, options->blockRowCount);
|
||||||
smgrclose(srel);
|
smgrclose(srel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,6 +637,8 @@ cstore_tableam_init()
|
||||||
{
|
{
|
||||||
PreviousExecutorEndHook = ExecutorEnd_hook;
|
PreviousExecutorEndHook = ExecutorEnd_hook;
|
||||||
ExecutorEnd_hook = CStoreExecutorEnd;
|
ExecutorEnd_hook = CStoreExecutorEnd;
|
||||||
|
prevObjectAccessHook = object_access_hook;
|
||||||
|
object_access_hook = CStoreTableAMObjectAccessHook;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -641,6 +649,79 @@ cstore_tableam_finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Implements object_access_hook. One of the places this is called is just
|
||||||
|
* before dropping an object, which allows us to clean-up resources for
|
||||||
|
* cstore tables.
|
||||||
|
*
|
||||||
|
* See the comments for CStoreFdwObjectAccessHook for more details.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int
|
||||||
|
subId,
|
||||||
|
void *arg)
|
||||||
|
{
|
||||||
|
if (prevObjectAccessHook)
|
||||||
|
{
|
||||||
|
prevObjectAccessHook(access, classId, objectId, subId, arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Do nothing if this is not a DROP relation command.
|
||||||
|
*/
|
||||||
|
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Lock relation to prevent it from being dropped and to avoid
|
||||||
|
* race conditions in the next if block.
|
||||||
|
*/
|
||||||
|
LockRelationOid(objectId, AccessShareLock);
|
||||||
|
|
||||||
|
if (IsCStoreTableAmTable(objectId))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Drop metadata. No need to drop storage here since for
|
||||||
|
* tableam tables storage is managed by postgres.
|
||||||
|
*/
|
||||||
|
Relation rel = table_open(objectId, AccessExclusiveLock);
|
||||||
|
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
|
||||||
|
|
||||||
|
/* keep the lock since we did physical changes to the relation */
|
||||||
|
table_close(rel, NoLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCStoreTableAmTable returns true if relation has cstore_tableam
|
||||||
|
* access method. This can be called before extension creation.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsCStoreTableAmTable(Oid relationId)
|
||||||
|
{
|
||||||
|
bool result;
|
||||||
|
Relation rel;
|
||||||
|
|
||||||
|
if (!OidIsValid(relationId))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Lock relation to prevent it from being dropped &
|
||||||
|
* avoid race conditions.
|
||||||
|
*/
|
||||||
|
rel = relation_open(relationId, AccessShareLock);
|
||||||
|
result = rel->rd_tableam == GetCstoreTableAmRoutine();
|
||||||
|
relation_close(rel, NoLock);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static const TableAmRoutine cstore_am_methods = {
|
static const TableAmRoutine cstore_am_methods = {
|
||||||
.type = T_TableAmRoutine,
|
.type = T_TableAmRoutine,
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
|
||||||
int columnTypeLength, Oid columnCollation,
|
int columnTypeLength, Oid columnCollation,
|
||||||
FmgrInfo *comparisonFunction);
|
FmgrInfo *comparisonFunction);
|
||||||
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
||||||
static void AppendStripeMetadata(TableMetadata *tableMetadata,
|
static void AppendStripeMetadata(DataFileMetadata *datafileMetadata,
|
||||||
StripeMetadata stripeMetadata);
|
StripeMetadata stripeMetadata);
|
||||||
static StringInfo CopyStringInfo(StringInfo sourceString);
|
static StringInfo CopyStringInfo(StringInfo sourceString);
|
||||||
|
|
||||||
|
@ -58,13 +58,13 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
|
||||||
* will be added.
|
* will be added.
|
||||||
*/
|
*/
|
||||||
TableWriteState *
|
TableWriteState *
|
||||||
CStoreBeginWrite(Oid relationId,
|
CStoreBeginWrite(Relation relation,
|
||||||
CompressionType compressionType,
|
CompressionType compressionType,
|
||||||
uint64 stripeMaxRowCount, uint32 blockRowCount,
|
uint64 stripeMaxRowCount, uint32 blockRowCount,
|
||||||
TupleDesc tupleDescriptor)
|
TupleDesc tupleDescriptor)
|
||||||
{
|
{
|
||||||
TableWriteState *writeState = NULL;
|
TableWriteState *writeState = NULL;
|
||||||
TableMetadata *tableMetadata = NULL;
|
DataFileMetadata *datafileMetadata = NULL;
|
||||||
FmgrInfo **comparisonFunctionArray = NULL;
|
FmgrInfo **comparisonFunctionArray = NULL;
|
||||||
MemoryContext stripeWriteContext = NULL;
|
MemoryContext stripeWriteContext = NULL;
|
||||||
uint64 currentFileOffset = 0;
|
uint64 currentFileOffset = 0;
|
||||||
|
@ -73,19 +73,20 @@ CStoreBeginWrite(Oid relationId,
|
||||||
bool *columnMaskArray = NULL;
|
bool *columnMaskArray = NULL;
|
||||||
BlockData *blockData = NULL;
|
BlockData *blockData = NULL;
|
||||||
uint64 currentStripeId = 0;
|
uint64 currentStripeId = 0;
|
||||||
|
Oid relNode = relation->rd_node.relNode;
|
||||||
|
|
||||||
tableMetadata = ReadTableMetadata(relationId);
|
datafileMetadata = ReadDataFileMetadata(relNode);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If stripeMetadataList is not empty, jump to the position right after
|
* If stripeMetadataList is not empty, jump to the position right after
|
||||||
* the last position.
|
* the last position.
|
||||||
*/
|
*/
|
||||||
if (tableMetadata->stripeMetadataList != NIL)
|
if (datafileMetadata->stripeMetadataList != NIL)
|
||||||
{
|
{
|
||||||
StripeMetadata *lastStripe = NULL;
|
StripeMetadata *lastStripe = NULL;
|
||||||
uint64 lastStripeSize = 0;
|
uint64 lastStripeSize = 0;
|
||||||
|
|
||||||
lastStripe = llast(tableMetadata->stripeMetadataList);
|
lastStripe = llast(datafileMetadata->stripeMetadataList);
|
||||||
lastStripeSize += lastStripe->dataLength;
|
lastStripeSize += lastStripe->dataLength;
|
||||||
|
|
||||||
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
||||||
|
@ -127,8 +128,8 @@ CStoreBeginWrite(Oid relationId,
|
||||||
blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount);
|
blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount);
|
||||||
|
|
||||||
writeState = palloc0(sizeof(TableWriteState));
|
writeState = palloc0(sizeof(TableWriteState));
|
||||||
writeState->relationId = relationId;
|
writeState->relation = relation;
|
||||||
writeState->tableMetadata = tableMetadata;
|
writeState->datafileMetadata = datafileMetadata;
|
||||||
writeState->compressionType = compressionType;
|
writeState->compressionType = compressionType;
|
||||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||||
writeState->blockRowCount = blockRowCount;
|
writeState->blockRowCount = blockRowCount;
|
||||||
|
@ -163,7 +164,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||||
uint32 columnCount = writeState->tupleDescriptor->natts;
|
uint32 columnCount = writeState->tupleDescriptor->natts;
|
||||||
TableMetadata *tableMetadata = writeState->tableMetadata;
|
DataFileMetadata *datafileMetadata = writeState->datafileMetadata;
|
||||||
const uint32 blockRowCount = writeState->blockRowCount;
|
const uint32 blockRowCount = writeState->blockRowCount;
|
||||||
BlockData *blockData = writeState->blockData;
|
BlockData *blockData = writeState->blockData;
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||||
|
@ -251,8 +252,9 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
||||||
* doesn't free it.
|
* doesn't free it.
|
||||||
*/
|
*/
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||||
AppendStripeMetadata(tableMetadata, stripeMetadata);
|
&stripeMetadata);
|
||||||
|
AppendStripeMetadata(datafileMetadata, stripeMetadata);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -280,12 +282,13 @@ CStoreEndWrite(TableWriteState *writeState)
|
||||||
MemoryContextReset(writeState->stripeWriteContext);
|
MemoryContextReset(writeState->stripeWriteContext);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||||
AppendStripeMetadata(writeState->tableMetadata, stripeMetadata);
|
&stripeMetadata);
|
||||||
|
AppendStripeMetadata(writeState->datafileMetadata, stripeMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextDelete(writeState->stripeWriteContext);
|
MemoryContextDelete(writeState->stripeWriteContext);
|
||||||
list_free_deep(writeState->tableMetadata->stripeMetadataList);
|
list_free_deep(writeState->datafileMetadata->stripeMetadataList);
|
||||||
pfree(writeState->comparisonFunctionArray);
|
pfree(writeState->comparisonFunctionArray);
|
||||||
FreeBlockData(writeState->blockData);
|
FreeBlockData(writeState->blockData);
|
||||||
pfree(writeState);
|
pfree(writeState);
|
||||||
|
@ -543,7 +546,8 @@ FlushStripe(TableWriteState *writeState)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create skip list and footer buffers */
|
/* create skip list and footer buffers */
|
||||||
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
|
SaveStripeSkipList(writeState->relation->rd_node.relNode,
|
||||||
|
writeState->currentStripeId,
|
||||||
stripeSkipList, tupleDescriptor);
|
stripeSkipList, tupleDescriptor);
|
||||||
|
|
||||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||||
|
@ -787,13 +791,13 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength)
|
||||||
* table footer's stripeMetadataList.
|
* table footer's stripeMetadataList.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata)
|
AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
|
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
|
||||||
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
|
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
|
||||||
|
|
||||||
tableMetadata->stripeMetadataList = lappend(tableMetadata->stripeMetadataList,
|
datafileMetadata->stripeMetadataList = lappend(datafileMetadata->stripeMetadataList,
|
||||||
stripeMetadataCopy);
|
stripeMetadataCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,14 +12,29 @@
|
||||||
-- 'postgres' directory is excluded from comparison to have the same result.
|
-- 'postgres' directory is excluded from comparison to have the same result.
|
||||||
-- store postgres database oid
|
-- store postgres database oid
|
||||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
-- DROP cstore_fdw tables
|
-- DROP cstore_fdw tables
|
||||||
DROP TABLE contestant;
|
DROP TABLE contestant;
|
||||||
DROP TABLE contestant_compressed;
|
DROP TABLE contestant_compressed;
|
||||||
|
-- make sure DROP deletes metadata
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Create a cstore_fdw table under a schema and drop it.
|
-- Create a cstore_fdw table under a schema and drop it.
|
||||||
CREATE SCHEMA test_schema;
|
CREATE SCHEMA test_schema;
|
||||||
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
|
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
DROP SCHEMA test_schema CASCADE;
|
DROP SCHEMA test_schema CASCADE;
|
||||||
NOTICE: drop cascades to table test_schema.test_table
|
NOTICE: drop cascades to table test_schema.test_table
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT current_database() datname \gset
|
SELECT current_database() datname \gset
|
||||||
CREATE DATABASE db_to_drop;
|
CREATE DATABASE db_to_drop;
|
||||||
\c db_to_drop
|
\c db_to_drop
|
||||||
|
|
|
@ -12,14 +12,29 @@
|
||||||
-- 'postgres' directory is excluded from comparison to have the same result.
|
-- 'postgres' directory is excluded from comparison to have the same result.
|
||||||
-- store postgres database oid
|
-- store postgres database oid
|
||||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
-- DROP cstore_fdw tables
|
-- DROP cstore_fdw tables
|
||||||
DROP FOREIGN TABLE contestant;
|
DROP FOREIGN TABLE contestant;
|
||||||
DROP FOREIGN TABLE contestant_compressed;
|
DROP FOREIGN TABLE contestant_compressed;
|
||||||
|
-- make sure DROP deletes metadata
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Create a cstore_fdw table under a schema and drop it.
|
-- Create a cstore_fdw table under a schema and drop it.
|
||||||
CREATE SCHEMA test_schema;
|
CREATE SCHEMA test_schema;
|
||||||
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
DROP SCHEMA test_schema CASCADE;
|
DROP SCHEMA test_schema CASCADE;
|
||||||
NOTICE: drop cascades to foreign table test_schema.test_table
|
NOTICE: drop cascades to foreign table test_schema.test_table
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT current_database() datname \gset
|
SELECT current_database() datname \gset
|
||||||
CREATE DATABASE db_to_drop;
|
CREATE DATABASE db_to_drop;
|
||||||
\c db_to_drop
|
\c db_to_drop
|
||||||
|
|
|
@ -15,14 +15,22 @@
|
||||||
-- store postgres database oid
|
-- store postgres database oid
|
||||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||||
|
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
|
|
||||||
-- DROP cstore_fdw tables
|
-- DROP cstore_fdw tables
|
||||||
DROP TABLE contestant;
|
DROP TABLE contestant;
|
||||||
DROP TABLE contestant_compressed;
|
DROP TABLE contestant_compressed;
|
||||||
|
|
||||||
|
-- make sure DROP deletes metadata
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
|
||||||
-- Create a cstore_fdw table under a schema and drop it.
|
-- Create a cstore_fdw table under a schema and drop it.
|
||||||
CREATE SCHEMA test_schema;
|
CREATE SCHEMA test_schema;
|
||||||
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
|
CREATE TABLE test_schema.test_table(data int) USING cstore_tableam;
|
||||||
|
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
DROP SCHEMA test_schema CASCADE;
|
DROP SCHEMA test_schema CASCADE;
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
|
||||||
SELECT current_database() datname \gset
|
SELECT current_database() datname \gset
|
||||||
|
|
||||||
|
|
|
@ -15,14 +15,22 @@
|
||||||
-- store postgres database oid
|
-- store postgres database oid
|
||||||
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
SELECT oid postgres_oid FROM pg_database WHERE datname = 'postgres' \gset
|
||||||
|
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
|
|
||||||
-- DROP cstore_fdw tables
|
-- DROP cstore_fdw tables
|
||||||
DROP FOREIGN TABLE contestant;
|
DROP FOREIGN TABLE contestant;
|
||||||
DROP FOREIGN TABLE contestant_compressed;
|
DROP FOREIGN TABLE contestant_compressed;
|
||||||
|
|
||||||
|
-- make sure DROP deletes metadata
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
|
||||||
-- Create a cstore_fdw table under a schema and drop it.
|
-- Create a cstore_fdw table under a schema and drop it.
|
||||||
CREATE SCHEMA test_schema;
|
CREATE SCHEMA test_schema;
|
||||||
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
CREATE FOREIGN TABLE test_schema.test_table(data int) SERVER cstore_server;
|
||||||
|
|
||||||
|
SELECT count(*) AS cstore_data_files_before_drop FROM cstore.cstore_data_files \gset
|
||||||
DROP SCHEMA test_schema CASCADE;
|
DROP SCHEMA test_schema CASCADE;
|
||||||
|
SELECT :cstore_data_files_before_drop - count(*) FROM cstore.cstore_data_files;
|
||||||
|
|
||||||
SELECT current_database() datname \gset
|
SELECT current_database() datname \gset
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue