mirror of https://github.com/citusdata/citus.git
Associate metadata with rel filenode
parent
207eedc35a
commit
cf0ba6103e
23
cstore.c
23
cstore.c
|
@ -102,26 +102,11 @@ ParseCompressionType(const char *compressionTypeString)
|
|||
|
||||
|
||||
/*
|
||||
* InitializeCStoreTableFile creates data and footer file for a cstore table.
|
||||
* The function assumes data and footer files do not exist, therefore
|
||||
* it should be called on empty or non-existing table. Notice that the caller
|
||||
* is expected to acquire AccessExclusiveLock on the relation.
|
||||
* InitializeCStoreTableFile initializes metadata for the given relation
|
||||
* file node.
|
||||
*/
|
||||
void
|
||||
InitializeCStoreTableFile(Oid relationId, Relation relation, CStoreOptions *cstoreOptions)
|
||||
InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions)
|
||||
{
|
||||
TableWriteState *writeState = NULL;
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||
|
||||
InitCStoreTableMetadata(relationId, cstoreOptions->blockRowCount);
|
||||
|
||||
/*
|
||||
* Initialize state to write to the cstore file. This creates an
|
||||
* empty data file and a valid footer file for the table.
|
||||
*/
|
||||
writeState = CStoreBeginWrite(relationId,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount, tupleDescriptor);
|
||||
CStoreEndWrite(writeState);
|
||||
InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount);
|
||||
}
|
||||
|
|
26
cstore.h
26
cstore.h
|
@ -16,7 +16,9 @@
|
|||
|
||||
#include "fmgr.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "storage/lockdefs.h"
|
||||
#include "utils/relcache.h"
|
||||
|
||||
/* Defines for valid option names */
|
||||
|
@ -190,8 +192,6 @@ typedef struct StripeBuffers
|
|||
/* TableReadState represents state of a cstore file read operation. */
|
||||
typedef struct TableReadState
|
||||
{
|
||||
Oid relationId;
|
||||
|
||||
TableMetadata *tableMetadata;
|
||||
StripeMetadata *currentStripeMetadata;
|
||||
TupleDesc tupleDescriptor;
|
||||
|
@ -217,7 +217,6 @@ typedef struct TableReadState
|
|||
/* TableWriteState represents state of a cstore file write operation. */
|
||||
typedef struct TableWriteState
|
||||
{
|
||||
Oid relationId;
|
||||
TableMetadata *tableMetadata;
|
||||
CompressionType compressionType;
|
||||
TupleDesc tupleDescriptor;
|
||||
|
@ -249,11 +248,12 @@ extern int cstore_block_row_count;
|
|||
extern void cstore_init(void);
|
||||
|
||||
extern CompressionType ParseCompressionType(const char *compressionTypeString);
|
||||
extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
|
||||
CStoreOptions *cstoreOptions);
|
||||
extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions);
|
||||
extern bool IsCStoreFdwTable(Oid relationId);
|
||||
extern Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
|
||||
|
||||
/* Function declarations for writing to a cstore file */
|
||||
extern TableWriteState * CStoreBeginWrite(Oid relationId,
|
||||
extern TableWriteState * CStoreBeginWrite(Relation relation,
|
||||
CompressionType compressionType,
|
||||
uint64 stripeMaxRowCount,
|
||||
uint32 blockRowCount,
|
||||
|
@ -263,7 +263,7 @@ extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
|
|||
extern void CStoreEndWrite(TableWriteState *state);
|
||||
|
||||
/* Function declarations for reading from a cstore file */
|
||||
extern TableReadState * CStoreBeginRead(Oid relationId,
|
||||
extern TableReadState * CStoreBeginRead(Relation relation,
|
||||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *qualConditions);
|
||||
extern bool CStoreReadFinished(TableReadState *state);
|
||||
|
@ -283,12 +283,14 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
|
|||
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
|
||||
|
||||
/* cstore_metadata_tables.c */
|
||||
extern void InitCStoreTableMetadata(Oid relid, int blockRowCount);
|
||||
extern void InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe);
|
||||
extern TableMetadata * ReadTableMetadata(Oid relid);
|
||||
extern void SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||
extern void DeleteTableMetadataRowIfExists(Oid relfilenode);
|
||||
extern void InitCStoreTableMetadata(Oid relfilenode, int blockRowCount);
|
||||
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
|
||||
extern TableMetadata * ReadTableMetadata(Oid relfilenode);
|
||||
extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe,
|
||||
extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||
TupleDesc tupleDescriptor,
|
||||
uint32 blockCount);
|
||||
|
||||
|
|
|
@ -32,17 +32,17 @@ AS 'MODULE_PATHNAME'
|
|||
LANGUAGE C STRICT;
|
||||
|
||||
CREATE TABLE cstore_tables (
|
||||
relid oid NOT NULL,
|
||||
relfilenode oid NOT NULL,
|
||||
block_row_count int NOT NULL,
|
||||
version_major bigint NOT NULL,
|
||||
version_minor bigint NOT NULL,
|
||||
PRIMARY KEY (relid)
|
||||
PRIMARY KEY (relfilenode)
|
||||
) WITH (user_catalog_table = true);
|
||||
|
||||
COMMENT ON TABLE cstore_tables IS 'CStore table wide metadata';
|
||||
|
||||
CREATE TABLE cstore_stripes (
|
||||
relid oid NOT NULL,
|
||||
relfilenode oid NOT NULL,
|
||||
stripe bigint NOT NULL,
|
||||
file_offset bigint NOT NULL,
|
||||
data_length bigint NOT NULL,
|
||||
|
@ -50,14 +50,14 @@ CREATE TABLE cstore_stripes (
|
|||
block_count int NOT NULL,
|
||||
block_row_count int NOT NULL,
|
||||
row_count bigint NOT NULL,
|
||||
PRIMARY KEY (relid, stripe),
|
||||
FOREIGN KEY (relid) REFERENCES cstore_tables(relid) ON DELETE CASCADE INITIALLY DEFERRED
|
||||
PRIMARY KEY (relfilenode, stripe),
|
||||
FOREIGN KEY (relfilenode) REFERENCES cstore_tables(relfilenode) ON DELETE CASCADE INITIALLY DEFERRED
|
||||
) WITH (user_catalog_table = true);
|
||||
|
||||
COMMENT ON TABLE cstore_tables IS 'CStore per stripe metadata';
|
||||
|
||||
CREATE TABLE cstore_skipnodes (
|
||||
relid oid NOT NULL,
|
||||
relfilenode oid NOT NULL,
|
||||
stripe bigint NOT NULL,
|
||||
attr int NOT NULL,
|
||||
block int NOT NULL,
|
||||
|
@ -69,8 +69,8 @@ CREATE TABLE cstore_skipnodes (
|
|||
exists_stream_offset bigint NOT NULL,
|
||||
exists_stream_length bigint NOT NULL,
|
||||
value_compression_type int NOT NULL,
|
||||
PRIMARY KEY (relid, stripe, attr, block),
|
||||
FOREIGN KEY (relid, stripe) REFERENCES cstore_stripes(relid, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
||||
PRIMARY KEY (relfilenode, stripe, attr, block),
|
||||
FOREIGN KEY (relfilenode, stripe) REFERENCES cstore_stripes(relfilenode, stripe) ON DELETE CASCADE INITIALLY DEFERRED
|
||||
) WITH (user_catalog_table = true);
|
||||
|
||||
COMMENT ON TABLE cstore_tables IS 'CStore per block metadata';
|
||||
|
|
47
cstore_fdw.c
47
cstore_fdw.c
|
@ -131,7 +131,6 @@ static List * FindCStoreTables(List *tableList);
|
|||
static List * OpenRelationsForTruncate(List *cstoreTableList);
|
||||
static void FdwNewRelFileNode(Relation relation);
|
||||
static void TruncateCStoreTables(List *cstoreRelationList);
|
||||
static bool CStoreTable(Oid relationId);
|
||||
static bool CStoreServer(ForeignServer *server);
|
||||
static bool DistributedTable(Oid relationId);
|
||||
static bool DistributedWorkerCopy(CopyStmt *copyStatement);
|
||||
|
@ -189,7 +188,6 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
|
|||
RangeTblEntry *rte);
|
||||
#endif
|
||||
static void cstore_fdw_initrel(Relation rel);
|
||||
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
|
||||
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
|
||||
|
||||
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
|
||||
|
@ -267,7 +265,8 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
|||
* We have no chance to hook into server creation to create data
|
||||
* directory for it during database creation time.
|
||||
*/
|
||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
||||
InitializeCStoreTableFile(relation->rd_node.relNode,
|
||||
CStoreGetOptions(relationId));
|
||||
heap_close(relation, AccessExclusiveLock);
|
||||
}
|
||||
}
|
||||
|
@ -403,7 +402,7 @@ CopyCStoreTableStatement(CopyStmt *copyStatement)
|
|||
{
|
||||
Oid relationId = RangeVarGetRelid(copyStatement->relation,
|
||||
AccessShareLock, true);
|
||||
bool cstoreTable = CStoreTable(relationId);
|
||||
bool cstoreTable = IsCStoreFdwTable(relationId);
|
||||
if (cstoreTable)
|
||||
{
|
||||
bool distributedTable = DistributedTable(relationId);
|
||||
|
@ -558,12 +557,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
#endif
|
||||
|
||||
/* init state to write to the cstore file */
|
||||
writeState = CStoreBeginWrite(relationId,
|
||||
writeState = CStoreBeginWrite(relation,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupleDescriptor);
|
||||
writeState->relation = relation;
|
||||
|
||||
while (nextRowFound)
|
||||
{
|
||||
|
@ -686,7 +684,7 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
}
|
||||
|
||||
relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true);
|
||||
if (!CStoreTable(relationId))
|
||||
if (!IsCStoreFdwTable(relationId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -765,7 +763,7 @@ FindCStoreTables(List *tableList)
|
|||
{
|
||||
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
|
||||
if (CStoreTable(relationId) && !DistributedTable(relationId))
|
||||
if (IsCStoreFdwTable(relationId) && !DistributedTable(relationId))
|
||||
{
|
||||
cstoreTableList = lappend(cstoreTableList, rangeVar);
|
||||
}
|
||||
|
@ -825,10 +823,11 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
Relation relation = (Relation) lfirst(relationCell);
|
||||
Oid relationId = relation->rd_id;
|
||||
|
||||
Assert(CStoreTable(relationId));
|
||||
Assert(IsCStoreFdwTable(relationId));
|
||||
|
||||
FdwNewRelFileNode(relation);
|
||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
||||
InitializeCStoreTableFile(relation->rd_node.relNode,
|
||||
CStoreGetOptions(relationId));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -861,7 +860,6 @@ FdwNewRelFileNode(Relation relation)
|
|||
Relation tmprel;
|
||||
Oid tablespace;
|
||||
Oid filenode;
|
||||
RelFileNode newrnode;
|
||||
|
||||
/*
|
||||
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
||||
|
@ -887,10 +885,6 @@ FdwNewRelFileNode(Relation relation)
|
|||
|
||||
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||
|
||||
newrnode.spcNode = tablespace;
|
||||
newrnode.dbNode = MyDatabaseId;
|
||||
newrnode.relNode = filenode;
|
||||
|
||||
classform->relfilenode = filenode;
|
||||
classform->relpages = 0; /* it's empty until further notice */
|
||||
classform->reltuples = 0;
|
||||
|
@ -900,6 +894,10 @@ FdwNewRelFileNode(Relation relation)
|
|||
|
||||
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
|
||||
CommandCounterIncrement();
|
||||
|
||||
relation->rd_node.spcNode = tablespace;
|
||||
relation->rd_node.dbNode = MyDatabaseId;
|
||||
relation->rd_node.relNode = filenode;
|
||||
}
|
||||
|
||||
heap_freetuple(tuple);
|
||||
|
@ -928,11 +926,11 @@ FdwCreateStorage(Relation relation)
|
|||
|
||||
|
||||
/*
|
||||
* CStoreTable checks if the given table name belongs to a foreign columnar store
|
||||
* IsCStoreFdwTable checks if the given table name belongs to a foreign columnar store
|
||||
* table. If it does, the function returns true. Otherwise, it returns false.
|
||||
*/
|
||||
static bool
|
||||
CStoreTable(Oid relationId)
|
||||
bool
|
||||
IsCStoreFdwTable(Oid relationId)
|
||||
{
|
||||
bool cstoreTable = false;
|
||||
char relationKind = 0;
|
||||
|
@ -1055,7 +1053,7 @@ Datum
|
|||
cstore_table_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool cstoreTable = CStoreTable(relationId);
|
||||
bool cstoreTable = IsCStoreFdwTable(relationId);
|
||||
Relation relation;
|
||||
BlockNumber nblocks;
|
||||
|
||||
|
@ -1705,6 +1703,7 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
|||
ForeignScan *foreignScan = NULL;
|
||||
List *foreignPrivateList = NIL;
|
||||
List *whereClauseList = NIL;
|
||||
Relation relation = NULL;
|
||||
|
||||
cstore_fdw_initrel(currentRelation);
|
||||
|
||||
|
@ -1721,9 +1720,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
|||
whereClauseList = foreignScan->scan.plan.qual;
|
||||
|
||||
columnList = (List *) linitial(foreignPrivateList);
|
||||
readState = CStoreBeginRead(foreignTableId,
|
||||
tupleDescriptor, columnList, whereClauseList);
|
||||
readState->relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
readState = CStoreBeginRead(relation, tupleDescriptor, columnList, whereClauseList);
|
||||
|
||||
scanState->fdw_state = (void *) readState;
|
||||
}
|
||||
|
@ -2067,13 +2065,12 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
|
|||
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
writeState = CStoreBeginWrite(foreignTableOid,
|
||||
writeState = CStoreBeginWrite(relation,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupleDescriptor);
|
||||
|
||||
writeState->relation = relation;
|
||||
relationInfo->ri_FdwState = (void *) writeState;
|
||||
}
|
||||
|
||||
|
@ -2196,7 +2193,7 @@ cstore_fdw_initrel(Relation rel)
|
|||
}
|
||||
|
||||
|
||||
static Relation
|
||||
Relation
|
||||
cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
|
||||
{
|
||||
Relation rel = heap_open(relationId, lockmode);
|
||||
|
|
|
@ -50,8 +50,7 @@ static Oid CStoreTablesIndexRelationId(void);
|
|||
static Oid CStoreSkipNodesRelationId(void);
|
||||
static Oid CStoreSkipNodesIndexRelationId(void);
|
||||
static Oid CStoreNamespaceId(void);
|
||||
static int TableBlockRowCount(Oid relid);
|
||||
static void DeleteTableMetadataRowIfExists(Oid relid);
|
||||
static bool ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount);
|
||||
static ModifyState * StartModifyRelation(Relation rel);
|
||||
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||
bool *nulls);
|
||||
|
@ -63,14 +62,14 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
|||
|
||||
/* constants for cstore_table */
|
||||
#define Natts_cstore_tables 4
|
||||
#define Anum_cstore_tables_relid 1
|
||||
#define Anum_cstore_tables_relfilenode 1
|
||||
#define Anum_cstore_tables_block_row_count 2
|
||||
#define Anum_cstore_tables_version_major 3
|
||||
#define Anum_cstore_tables_version_minor 4
|
||||
|
||||
/* constants for cstore_stripe */
|
||||
#define Natts_cstore_stripes 8
|
||||
#define Anum_cstore_stripes_relid 1
|
||||
#define Anum_cstore_stripes_relfilenode 1
|
||||
#define Anum_cstore_stripes_stripe 2
|
||||
#define Anum_cstore_stripes_file_offset 3
|
||||
#define Anum_cstore_stripes_data_length 4
|
||||
|
@ -81,7 +80,7 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
|||
|
||||
/* constants for cstore_skipnodes */
|
||||
#define Natts_cstore_skipnodes 12
|
||||
#define Anum_cstore_skipnodes_relid 1
|
||||
#define Anum_cstore_skipnodes_relfilenode 1
|
||||
#define Anum_cstore_skipnodes_stripe 2
|
||||
#define Anum_cstore_skipnodes_attr 3
|
||||
#define Anum_cstore_skipnodes_block 4
|
||||
|
@ -99,7 +98,7 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
|||
* InitCStoreTableMetadata adds a record for the given relation in cstore_table.
|
||||
*/
|
||||
void
|
||||
InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
||||
InitCStoreTableMetadata(Oid relfilenode, int blockRowCount)
|
||||
{
|
||||
Oid cstoreTablesOid = InvalidOid;
|
||||
Relation cstoreTables = NULL;
|
||||
|
@ -107,13 +106,13 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
|||
|
||||
bool nulls[Natts_cstore_tables] = { 0 };
|
||||
Datum values[Natts_cstore_tables] = {
|
||||
ObjectIdGetDatum(relid),
|
||||
ObjectIdGetDatum(relfilenode),
|
||||
Int32GetDatum(blockRowCount),
|
||||
Int32GetDatum(CSTORE_VERSION_MAJOR),
|
||||
Int32GetDatum(CSTORE_VERSION_MINOR)
|
||||
};
|
||||
|
||||
DeleteTableMetadataRowIfExists(relid);
|
||||
DeleteTableMetadataRowIfExists(relfilenode);
|
||||
|
||||
cstoreTablesOid = CStoreTablesRelationId();
|
||||
cstoreTables = heap_open(cstoreTablesOid, RowExclusiveLock);
|
||||
|
@ -133,7 +132,7 @@ InitCStoreTableMetadata(Oid relid, int blockRowCount)
|
|||
* of cstore_skipnodes.
|
||||
*/
|
||||
void
|
||||
SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||
SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
|
@ -155,7 +154,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
|||
&stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||
|
||||
Datum values[Natts_cstore_skipnodes] = {
|
||||
ObjectIdGetDatum(relid),
|
||||
ObjectIdGetDatum(relfilenode),
|
||||
Int64GetDatum(stripe),
|
||||
Int32GetDatum(columnIndex + 1),
|
||||
Int32GetDatum(blockIndex),
|
||||
|
@ -201,7 +200,7 @@ SaveStripeSkipList(Oid relid, uint64 stripe, StripeSkipList *stripeSkipList,
|
|||
* ReadStripeSkipList fetches StripeSkipList for a given stripe.
|
||||
*/
|
||||
StripeSkipList *
|
||||
ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
||||
ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
||||
uint32 blockCount)
|
||||
{
|
||||
StripeSkipList *skipList = NULL;
|
||||
|
@ -218,8 +217,8 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
|||
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
||||
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
|
@ -311,11 +310,11 @@ ReadStripeSkipList(Oid relid, uint64 stripe, TupleDesc tupleDescriptor,
|
|||
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
||||
*/
|
||||
void
|
||||
InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
||||
InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
|
||||
{
|
||||
bool nulls[Natts_cstore_stripes] = { 0 };
|
||||
Datum values[Natts_cstore_stripes] = {
|
||||
ObjectIdGetDatum(relid),
|
||||
ObjectIdGetDatum(relfilenode),
|
||||
Int64GetDatum(stripe->id),
|
||||
Int64GetDatum(stripe->fileOffset),
|
||||
Int64GetDatum(stripe->dataLength),
|
||||
|
@ -339,11 +338,11 @@ InsertStripeMetadataRow(Oid relid, StripeMetadata *stripe)
|
|||
|
||||
|
||||
/*
|
||||
* ReadTableMetadata constructs TableMetadata for a given relid by reading
|
||||
* ReadTableMetadata constructs TableMetadata for a given relfilenode by reading
|
||||
* from cstore_tables and cstore_stripes.
|
||||
*/
|
||||
TableMetadata *
|
||||
ReadTableMetadata(Oid relid)
|
||||
ReadTableMetadata(Oid relfilenode)
|
||||
{
|
||||
Oid cstoreStripesOid = InvalidOid;
|
||||
Relation cstoreStripes = NULL;
|
||||
|
@ -352,12 +351,18 @@ ReadTableMetadata(Oid relid)
|
|||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple;
|
||||
bool found = false;
|
||||
|
||||
TableMetadata *tableMetadata = palloc0(sizeof(TableMetadata));
|
||||
tableMetadata->blockRowCount = TableBlockRowCount(relid);
|
||||
found = ReadCStoreTables(relfilenode, &tableMetadata->blockRowCount);
|
||||
if (!found)
|
||||
{
|
||||
ereport(ERROR, (errmsg("Relfilenode %d doesn't belong to a cstore table.",
|
||||
relfilenode)));
|
||||
}
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreStripesOid = CStoreStripesRelationId();
|
||||
cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
||||
|
@ -402,12 +407,13 @@ ReadTableMetadata(Oid relid)
|
|||
|
||||
|
||||
/*
|
||||
* TableBlockRowCount returns block_row_count column from cstore_tables for a given relid.
|
||||
* ReadCStoreTables reads corresponding record from cstore_tables. Returns false if
|
||||
* table was not found in cstore_tables.
|
||||
*/
|
||||
static int
|
||||
TableBlockRowCount(Oid relid)
|
||||
static bool
|
||||
ReadCStoreTables(Oid relfilenode, uint64 *blockRowCount)
|
||||
{
|
||||
int blockRowCount = 0;
|
||||
bool found = false;
|
||||
Oid cstoreTablesOid = InvalidOid;
|
||||
Relation cstoreTables = NULL;
|
||||
Relation index = NULL;
|
||||
|
@ -416,12 +422,29 @@ TableBlockRowCount(Oid relid)
|
|||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreTablesOid = CStoreTablesRelationId();
|
||||
cstoreTables = heap_open(cstoreTablesOid, AccessShareLock);
|
||||
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
||||
cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock);
|
||||
if (cstoreTables == NULL)
|
||||
{
|
||||
/*
|
||||
* Extension has been dropped. This can be called while
|
||||
* dropping extension or database via ObjectAccess().
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
index = try_relation_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
||||
if (index == NULL)
|
||||
{
|
||||
heap_close(cstoreTables, NoLock);
|
||||
|
||||
/* extension has been dropped */
|
||||
return false;
|
||||
}
|
||||
|
||||
tupleDescriptor = RelationGetDescr(cstoreTables);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
||||
|
@ -432,22 +455,24 @@ TableBlockRowCount(Oid relid)
|
|||
Datum datumArray[Natts_cstore_tables];
|
||||
bool isNullArray[Natts_cstore_tables];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count - 1]);
|
||||
*blockRowCount = DatumGetInt32(datumArray[Anum_cstore_tables_block_row_count -
|
||||
1]);
|
||||
found = true;
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(index, NoLock);
|
||||
heap_close(cstoreTables, NoLock);
|
||||
|
||||
return blockRowCount;
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeleteTableMetadataRowIfExists removes the row with given relid from cstore_stripes.
|
||||
* DeleteTableMetadataRowIfExists removes the row with given relfilenode from cstore_stripes.
|
||||
*/
|
||||
static void
|
||||
DeleteTableMetadataRowIfExists(Oid relid)
|
||||
void
|
||||
DeleteTableMetadataRowIfExists(Oid relfilenode)
|
||||
{
|
||||
Oid cstoreTablesOid = InvalidOid;
|
||||
Relation cstoreTables = NULL;
|
||||
|
@ -456,11 +481,17 @@ DeleteTableMetadataRowIfExists(Oid relid)
|
|||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relid));
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_tables_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreTablesOid = CStoreTablesRelationId();
|
||||
cstoreTables = heap_open(cstoreTablesOid, AccessShareLock);
|
||||
cstoreTables = try_relation_open(cstoreTablesOid, AccessShareLock);
|
||||
if (cstoreTables == NULL)
|
||||
{
|
||||
/* extension has been dropped */
|
||||
return;
|
||||
}
|
||||
|
||||
index = index_open(CStoreTablesIndexRelationId(), AccessShareLock);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreTables, index, NULL, 1, scanKey);
|
||||
|
|
|
@ -80,14 +80,15 @@ static StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
|||
* read handle that's used during reading rows and finishing the read operation.
|
||||
*/
|
||||
TableReadState *
|
||||
CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
|
||||
CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
TableReadState *readState = NULL;
|
||||
TableMetadata *tableMetadata = NULL;
|
||||
MemoryContext stripeReadContext = NULL;
|
||||
Oid relNode = relation->rd_node.relNode;
|
||||
|
||||
tableMetadata = ReadTableMetadata(relationId);
|
||||
tableMetadata = ReadTableMetadata(relNode);
|
||||
|
||||
/*
|
||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||
|
@ -99,7 +100,7 @@ CStoreBeginRead(Oid relationId, TupleDesc tupleDescriptor,
|
|||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
readState = palloc0(sizeof(TableReadState));
|
||||
readState->relationId = relationId;
|
||||
readState->relation = relation;
|
||||
readState->tableMetadata = tableMetadata;
|
||||
readState->projectedColumnList = projectedColumnList;
|
||||
readState->whereClauseList = whereClauseList;
|
||||
|
@ -308,7 +309,7 @@ CStoreTableRowCount(Relation relation)
|
|||
ListCell *stripeMetadataCell = NULL;
|
||||
uint64 totalRowCount = 0;
|
||||
|
||||
tableMetadata = ReadTableMetadata(relation->rd_id);
|
||||
tableMetadata = ReadTableMetadata(relation->rd_node.relNode);
|
||||
|
||||
foreach(stripeMetadataCell, tableMetadata->stripeMetadataList)
|
||||
{
|
||||
|
@ -337,7 +338,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
|
||||
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||
|
||||
StripeSkipList *stripeSkipList = ReadStripeSkipList(RelationGetRelid(relation),
|
||||
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node.relNode,
|
||||
stripeMetadata->id,
|
||||
tupleDescriptor,
|
||||
stripeMetadata->blockCount);
|
||||
|
|
|
@ -97,13 +97,11 @@ cstore_init_write_state(Relation relation)
|
|||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
|
||||
elog(LOG, "initializing write state for relation %d", relation->rd_id);
|
||||
CStoreWriteState = CStoreBeginWrite(relation->rd_id,
|
||||
CStoreWriteState = CStoreBeginWrite(relation,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupdesc);
|
||||
|
||||
CStoreWriteState->relation = relation;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,16 +132,12 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
ParallelTableScanDesc parallel_scan,
|
||||
uint32 flags)
|
||||
{
|
||||
Oid relid = relation->rd_id;
|
||||
TupleDesc tupdesc = relation->rd_att;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||
List *columnList = NIL;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions();
|
||||
|
||||
scan->cs_base.rs_rd = relation;
|
||||
scan->cs_base.rs_snapshot = snapshot;
|
||||
scan->cs_base.rs_nkeys = nkeys;
|
||||
|
@ -171,8 +165,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
columnList = lappend(columnList, var);
|
||||
}
|
||||
|
||||
readState = CStoreBeginRead(relid, tupdesc, columnList, NULL);
|
||||
readState->relation = relation;
|
||||
readState = CStoreBeginRead(relation, tupdesc, columnList, NULL);
|
||||
|
||||
scan->cs_readState = readState;
|
||||
|
||||
|
@ -443,7 +436,7 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
*freezeXid = RecentXmin;
|
||||
*minmulti = GetOldestMultiXactId();
|
||||
srel = RelationCreateStorage(*newrnode, persistence);
|
||||
InitializeCStoreTableFile(rel->rd_id, rel, CStoreTableAMGetOptions());
|
||||
InitializeCStoreTableFile(newrnode->relNode, CStoreTableAMGetOptions());
|
||||
smgrclose(srel);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
|
|||
* will be added.
|
||||
*/
|
||||
TableWriteState *
|
||||
CStoreBeginWrite(Oid relationId,
|
||||
CStoreBeginWrite(Relation relation,
|
||||
CompressionType compressionType,
|
||||
uint64 stripeMaxRowCount, uint32 blockRowCount,
|
||||
TupleDesc tupleDescriptor)
|
||||
|
@ -73,8 +73,9 @@ CStoreBeginWrite(Oid relationId,
|
|||
bool *columnMaskArray = NULL;
|
||||
BlockData *blockData = NULL;
|
||||
uint64 currentStripeId = 0;
|
||||
Oid relNode = relation->rd_node.relNode;
|
||||
|
||||
tableMetadata = ReadTableMetadata(relationId);
|
||||
tableMetadata = ReadTableMetadata(relNode);
|
||||
|
||||
/*
|
||||
* If stripeMetadataList is not empty, jump to the position right after
|
||||
|
@ -127,7 +128,7 @@ CStoreBeginWrite(Oid relationId,
|
|||
blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount);
|
||||
|
||||
writeState = palloc0(sizeof(TableWriteState));
|
||||
writeState->relationId = relationId;
|
||||
writeState->relation = relation;
|
||||
writeState->tableMetadata = tableMetadata;
|
||||
writeState->compressionType = compressionType;
|
||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||
|
@ -251,7 +252,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
|||
* doesn't free it.
|
||||
*/
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
||||
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||
&stripeMetadata);
|
||||
AppendStripeMetadata(tableMetadata, stripeMetadata);
|
||||
}
|
||||
else
|
||||
|
@ -280,7 +282,8 @@ CStoreEndWrite(TableWriteState *writeState)
|
|||
MemoryContextReset(writeState->stripeWriteContext);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
InsertStripeMetadataRow(writeState->relationId, &stripeMetadata);
|
||||
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||
&stripeMetadata);
|
||||
AppendStripeMetadata(writeState->tableMetadata, stripeMetadata);
|
||||
}
|
||||
|
||||
|
@ -543,7 +546,8 @@ FlushStripe(TableWriteState *writeState)
|
|||
}
|
||||
|
||||
/* create skip list and footer buffers */
|
||||
SaveStripeSkipList(writeState->relationId, writeState->currentStripeId,
|
||||
SaveStripeSkipList(writeState->relation->rd_node.relNode,
|
||||
writeState->currentStripeId,
|
||||
stripeSkipList, tupleDescriptor);
|
||||
|
||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||
|
|
Loading…
Reference in New Issue