mirror of https://github.com/citusdata/citus.git
Encapsulate snapshot used for reading stripes in cstore_metadata_tables
parent
76a71aa61a
commit
e481e73d18
1
cstore.h
1
cstore.h
|
@ -285,6 +285,7 @@ extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
|
|||
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount);
|
||||
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
|
||||
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk);
|
||||
extern uint64 GetHighestUsedAddress(Oid relfilenode);
|
||||
extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
|
|
|
@ -43,6 +43,7 @@ typedef struct
|
|||
EState *estate;
|
||||
} ModifyState;
|
||||
|
||||
static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot);
|
||||
static Oid CStoreStripesRelationId(void);
|
||||
static Oid CStoreStripesIndexRelationId(void);
|
||||
static Oid CStoreDataFilesRelationId(void);
|
||||
|
@ -345,17 +346,8 @@ InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
|
|||
DataFileMetadata *
|
||||
ReadDataFileMetadata(Oid relfilenode, bool missingOk)
|
||||
{
|
||||
Oid cstoreStripesOid = InvalidOid;
|
||||
Relation cstoreStripes = NULL;
|
||||
Relation index = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple;
|
||||
bool found = false;
|
||||
|
||||
DataFileMetadata *datafileMetadata = palloc0(sizeof(DataFileMetadata));
|
||||
found = ReadCStoreDataFiles(relfilenode, &datafileMetadata->blockRowCount);
|
||||
bool found = ReadCStoreDataFiles(relfilenode, &datafileMetadata->blockRowCount);
|
||||
if (!found)
|
||||
{
|
||||
if (!missingOk)
|
||||
|
@ -369,6 +361,56 @@ ReadDataFileMetadata(Oid relfilenode, bool missingOk)
|
|||
}
|
||||
}
|
||||
|
||||
datafileMetadata->stripeMetadataList =
|
||||
ReadDataFileStripeList(relfilenode, GetTransactionSnapshot());
|
||||
|
||||
return datafileMetadata;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetHighestUsedAddress returns the highest used address for the given
|
||||
* relfilenode across all active and inactive transactions.
|
||||
*/
|
||||
uint64
|
||||
GetHighestUsedAddress(Oid relfilenode)
|
||||
{
|
||||
uint64 highestUsedAddress = 0;
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
List *stripeMetadataList = NIL;
|
||||
|
||||
SnapshotData SnapshotDirty;
|
||||
InitDirtySnapshot(SnapshotDirty);
|
||||
|
||||
stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty);
|
||||
|
||||
foreach(stripeMetadataCell, stripeMetadataList)
|
||||
{
|
||||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||
uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
|
||||
highestUsedAddress = Max(highestUsedAddress, lastByte);
|
||||
}
|
||||
|
||||
return highestUsedAddress;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadDataFileStripeList reads the stripe list for a given relfilenode
|
||||
* in the given snapshot.
|
||||
*/
|
||||
static List *
|
||||
ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot)
|
||||
{
|
||||
List *stripeMetadataList = NIL;
|
||||
Oid cstoreStripesOid = InvalidOid;
|
||||
Relation cstoreStripes = NULL;
|
||||
Relation index = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
|
@ -403,16 +445,14 @@ ReadDataFileMetadata(Oid relfilenode, bool missingOk)
|
|||
stripeMetadata->rowCount = DatumGetInt64(
|
||||
datumArray[Anum_cstore_stripes_row_count - 1]);
|
||||
|
||||
datafileMetadata->stripeMetadataList = lappend(
|
||||
datafileMetadata->stripeMetadataList,
|
||||
stripeMetadata);
|
||||
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(index, NoLock);
|
||||
heap_close(cstoreStripes, NoLock);
|
||||
|
||||
return datafileMetadata;
|
||||
return stripeMetadataList;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -601,6 +601,8 @@ cstore_vacuum_rel(Relation rel, VacuumParams *params,
|
|||
/* this should have been resolved by vacuum.c until now */
|
||||
Assert(params->truncate != VACOPT_TERNARY_DEFAULT);
|
||||
|
||||
LogRelationStats(rel, elevel);
|
||||
|
||||
/*
|
||||
* We don't have updates, deletes, or concurrent updates, so all we
|
||||
* care for now is truncating the unused space at the end of storage.
|
||||
|
@ -609,8 +611,6 @@ cstore_vacuum_rel(Relation rel, VacuumParams *params,
|
|||
{
|
||||
TruncateCStore(rel, elevel);
|
||||
}
|
||||
|
||||
LogRelationStats(rel, elevel);
|
||||
}
|
||||
|
||||
|
||||
|
@ -727,8 +727,7 @@ TruncateCStore(Relation rel, int elevel)
|
|||
PGRUsage ru0;
|
||||
BlockNumber old_rel_pages = 0;
|
||||
BlockNumber new_rel_pages = 0;
|
||||
DataFileMetadata *metadata = NULL;
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
SmgrAddr highestPhysicalAddress;
|
||||
|
||||
pg_rusage_init(&ru0);
|
||||
|
||||
|
@ -765,17 +764,15 @@ TruncateCStore(Relation rel, int elevel)
|
|||
old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
|
||||
|
||||
/* loop over stripes and find max used block */
|
||||
foreach(stripeMetadataCell, metadata->stripeMetadataList)
|
||||
{
|
||||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||
uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
|
||||
SmgrAddr addr = logical_to_smgr(lastByte);
|
||||
new_rel_pages = Max(new_rel_pages, addr.blockno + 1);
|
||||
}
|
||||
/*
|
||||
* Due to the AccessExclusive lock there's no danger that
|
||||
* new stripes be added beyond highestPhysicalAddress while
|
||||
* we're truncating.
|
||||
*/
|
||||
highestPhysicalAddress =
|
||||
logical_to_smgr(GetHighestUsedAddress(rel->rd_node.relNode));
|
||||
|
||||
new_rel_pages = highestPhysicalAddress.blockno + 1;
|
||||
if (new_rel_pages == old_rel_pages)
|
||||
{
|
||||
UnlockRelation(rel, AccessExclusiveLock);
|
||||
|
|
|
@ -135,13 +135,13 @@ SELECT pg_size_pretty(pg_relation_size('t'));
|
|||
COMMIT;
|
||||
-- vacuum should truncate the relation to the usable space
|
||||
VACUUM VERBOSE t;
|
||||
INFO: "t": truncated 7 to 2 pages
|
||||
DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s
|
||||
INFO: statistics for "t":
|
||||
total file size: 16384, total data size: 10754
|
||||
total file size: 57344, total data size: 10754
|
||||
total row count: 2530, stripe count: 3, average rows per stripe: 843
|
||||
block count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
|
||||
|
||||
INFO: "t": truncated 7 to 2 pages
|
||||
DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s
|
||||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
pg_size_pretty
|
||||
----------------
|
||||
|
|
|
@ -10,12 +10,12 @@ step s1-begin:
|
|||
step s1-insert:
|
||||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
s2: INFO: "test_vacuum_vs_insert": stopping truncate due to conflicting lock request
|
||||
s2: INFO: statistics for "test_vacuum_vs_insert":
|
||||
total file size: 8192, total data size: 26
|
||||
total row count: 3, stripe count: 1, average rows per stripe: 3
|
||||
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
||||
|
||||
s2: INFO: "test_vacuum_vs_insert": stopping truncate due to conflicting lock request
|
||||
step s2-vacuum:
|
||||
VACUUM VERBOSE test_vacuum_vs_insert;
|
||||
|
||||
|
|
Loading…
Reference in New Issue