Merge pull request #22 from citusdata/concurrent_writes

Implement concurrent writes
merge-cstore-pykello
Hadi Moshayedi 2020-10-30 15:22:59 -07:00 committed by GitHub
commit efb7cf9bda
10 changed files with 388 additions and 131 deletions

View File

@ -55,7 +55,7 @@ ifeq ($(USE_TABLEAM),yes)
REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \ REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \ am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \
am_block_filtering am_join am_trigger am_block_filtering am_join am_trigger
ISOLATION += am_vacuum_vs_insert ISOLATION += am_write_concurrency am_vacuum_vs_insert
endif endif
ifeq ($(enable_coverage),yes) ifeq ($(enable_coverage),yes)

View File

@ -218,15 +218,12 @@ typedef struct TableReadState
/* TableWriteState represents state of a cstore file write operation. */ /* TableWriteState represents state of a cstore file write operation. */
typedef struct TableWriteState typedef struct TableWriteState
{ {
DataFileMetadata *datafileMetadata;
CompressionType compressionType; CompressionType compressionType;
TupleDesc tupleDescriptor; TupleDesc tupleDescriptor;
FmgrInfo **comparisonFunctionArray; FmgrInfo **comparisonFunctionArray;
uint64 currentFileOffset;
Relation relation; Relation relation;
MemoryContext stripeWriteContext; MemoryContext stripeWriteContext;
uint64 currentStripeId;
StripeBuffers *stripeBuffers; StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList; StripeSkipList *stripeSkipList;
uint32 stripeMaxRowCount; uint32 stripeMaxRowCount;
@ -284,9 +281,11 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio
/* cstore_metadata_tables.c */ /* cstore_metadata_tables.c */
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode); extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount); extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount);
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk); extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk);
extern uint64 GetHighestUsedAddress(Oid relfilenode); extern uint64 GetHighestUsedAddress(Oid relfilenode);
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
uint64 rowCount, uint64 columnCount,
uint64 blockCount, uint64 blockRowCount);
extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe, extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe,
StripeSkipList *stripeSkipList, StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor); TupleDesc tupleDescriptor);
@ -317,4 +316,30 @@ logical_to_smgr(uint64 logicalOffset)
} }
/*
* Map a physical page adnd offset address to a logical address.
*/
static inline uint64
smgr_to_logical(SmgrAddr addr)
{
uint64 bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
return bytes_per_page * addr.blockno + addr.offset - SizeOfPageHeaderData;
}
/*
* Get the first usable address of next block.
*/
static inline SmgrAddr
next_block_start(SmgrAddr addr)
{
SmgrAddr result = {
.blockno = addr.blockno + 1,
.offset = SizeOfPageHeaderData
};
return result;
}
#endif /* CSTORE_H */ #endif /* CSTORE_H */

View File

@ -494,10 +494,10 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
Assert(copyStatement->relation != NULL); Assert(copyStatement->relation != NULL);
/* /*
* Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow * Open and lock the relation. We acquire RowExclusiveLock to allow
* concurrent reads, but block concurrent writes. * concurrent reads and writes.
*/ */
relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock); relation = cstore_fdw_openrv(copyStatement->relation, RowExclusiveLock);
relationId = RelationGetRelid(relation); relationId = RelationGetRelid(relation);
/* allocate column values and nulls arrays */ /* allocate column values and nulls arrays */
@ -572,7 +572,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
/* end read/write sessions and close the relation */ /* end read/write sessions and close the relation */
EndCopyFrom(copyState); EndCopyFrom(copyState);
CStoreEndWrite(writeState); CStoreEndWrite(writeState);
heap_close(relation, ShareUpdateExclusiveLock); heap_close(relation, RowExclusiveLock);
return processedRowCount; return processedRowCount;
} }
@ -2015,7 +2015,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
Relation relation = NULL; Relation relation = NULL;
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc); foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock); relation = cstore_fdw_open(foreignTableOid, RowExclusiveLock);
cstoreOptions = CStoreGetOptions(foreignTableOid); cstoreOptions = CStoreGetOptions(foreignTableOid);
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
@ -2086,7 +2086,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo)
Relation relation = writeState->relation; Relation relation = writeState->relation;
CStoreEndWrite(writeState); CStoreEndWrite(writeState);
heap_close(relation, ShareUpdateExclusiveLock); heap_close(relation, RowExclusiveLock);
} }
} }

View File

@ -31,6 +31,8 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "port.h" #include "port.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -43,6 +45,10 @@ typedef struct
EState *estate; EState *estate;
} ModifyState; } ModifyState;
static void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
static void GetHighestUsedAddressAndId(Oid relfilenode,
uint64 *highestUsedAddress,
uint64 *highestUsedId);
static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot); static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot);
static Oid CStoreStripesRelationId(void); static Oid CStoreStripesRelationId(void);
static Oid CStoreStripesIndexRelationId(void); static Oid CStoreStripesIndexRelationId(void);
@ -311,7 +317,7 @@ ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
/* /*
* InsertStripeMetadataRow adds a row to cstore_stripes. * InsertStripeMetadataRow adds a row to cstore_stripes.
*/ */
void static void
InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe) InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
{ {
bool nulls[Natts_cstore_stripes] = { 0 }; bool nulls[Natts_cstore_stripes] = { 0 };
@ -330,7 +336,9 @@ InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock); Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(cstoreStripes); ModifyState *modifyState = StartModifyRelation(cstoreStripes);
InsertTupleAndEnforceConstraints(modifyState, values, nulls); InsertTupleAndEnforceConstraints(modifyState, values, nulls);
FinishModifyRelation(modifyState); FinishModifyRelation(modifyState);
CommandCounterIncrement(); CommandCounterIncrement();
@ -376,6 +384,23 @@ uint64
GetHighestUsedAddress(Oid relfilenode) GetHighestUsedAddress(Oid relfilenode)
{ {
uint64 highestUsedAddress = 0; uint64 highestUsedAddress = 0;
uint64 highestUsedId = 0;
GetHighestUsedAddressAndId(relfilenode, &highestUsedAddress, &highestUsedId);
return highestUsedAddress;
}
/*
* GetHighestUsedAddressAndId returns the highest used address and id for
* the given relfilenode across all active and inactive transactions.
*/
static void
GetHighestUsedAddressAndId(Oid relfilenode,
uint64 *highestUsedAddress,
uint64 *highestUsedId)
{
ListCell *stripeMetadataCell = NULL; ListCell *stripeMetadataCell = NULL;
List *stripeMetadataList = NIL; List *stripeMetadataList = NIL;
@ -384,14 +409,83 @@ GetHighestUsedAddress(Oid relfilenode)
stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty); stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty);
*highestUsedId = 0;
*highestUsedAddress = 0;
foreach(stripeMetadataCell, stripeMetadataList) foreach(stripeMetadataCell, stripeMetadataList)
{ {
StripeMetadata *stripe = lfirst(stripeMetadataCell); StripeMetadata *stripe = lfirst(stripeMetadataCell);
uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1; uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
highestUsedAddress = Max(highestUsedAddress, lastByte); *highestUsedAddress = Max(*highestUsedAddress, lastByte);
*highestUsedId = Max(*highestUsedId, stripe->id);
}
}
/*
* ReserveStripe reserves and stripe of given size for the given relation,
* and inserts it into cstore_stripes. It is guaranteed that concurrent
* writes won't overwrite the returned stripe.
*/
StripeMetadata
ReserveStripe(Relation rel, uint64 sizeBytes,
uint64 rowCount, uint64 columnCount,
uint64 blockCount, uint64 blockRowCount)
{
StripeMetadata stripe = { 0 };
Oid relfilenode = InvalidOid;
uint64 currLogicalHigh = 0;
SmgrAddr currSmgrHigh;
uint64 nblocks = 0;
uint64 resLogicalStart = 0;
SmgrAddr resSmgrStart;
uint64 resLogicalEnd = 0;
SmgrAddr resSmgrEnd;
uint64 highestId = 0;
/*
* We take ShareUpdateExclusiveLock here, so two space
* reservations conflict, space reservation <-> vacuum
* conflict, but space reservation doesn't conflict with
* reads & writes.
*/
LockRelation(rel, ShareUpdateExclusiveLock);
relfilenode = rel->rd_node.relNode;
GetHighestUsedAddressAndId(relfilenode, &currLogicalHigh, &highestId);
currSmgrHigh = logical_to_smgr(currLogicalHigh);
resSmgrStart = next_block_start(currSmgrHigh);
resLogicalStart = smgr_to_logical(resSmgrStart);
resLogicalEnd = resLogicalStart + sizeBytes - 1;
resSmgrEnd = logical_to_smgr(resLogicalEnd);
RelationOpenSmgr(rel);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
while (resSmgrEnd.blockno >= nblocks)
{
Buffer newBuffer = ReadBuffer(rel, P_NEW);
ReleaseBuffer(newBuffer);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
} }
return highestUsedAddress; RelationCloseSmgr(rel);
stripe.fileOffset = resLogicalStart;
stripe.dataLength = sizeBytes;
stripe.blockCount = blockCount;
stripe.blockRowCount = blockRowCount;
stripe.columnCount = columnCount;
stripe.rowCount = rowCount;
stripe.id = highestId + 1;
InsertStripeMetadataRow(relfilenode, &stripe);
UnlockRelation(rel, ShareUpdateExclusiveLock);
return stripe;
} }
@ -419,7 +513,7 @@ ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot)
index = index_open(CStoreStripesIndexRelationId(), AccessShareLock); index = index_open(CStoreStripesIndexRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(cstoreStripes); tupleDescriptor = RelationGetDescr(cstoreStripes);
scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, NULL, 1, scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, snapshot, 1,
scanKey); scanKey);
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
@ -593,6 +687,7 @@ InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
#if PG_VERSION_NUM >= 120000 #if PG_VERSION_NUM >= 120000
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor, TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
&TTSOpsHeapTuple); &TTSOpsHeapTuple);
ExecStoreHeapTuple(tuple, slot, false); ExecStoreHeapTuple(tuple, slot, false);
#else #else
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor); TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor);

View File

@ -120,8 +120,6 @@ ResetCStoreMemoryContext()
static void static void
cstore_init_write_state(Relation relation) cstore_init_write_state(Relation relation)
{ {
/*TODO: upgrade lock to serialize writes */
if (CStoreWriteState != NULL) if (CStoreWriteState != NULL)
{ {
/* TODO: consider whether it's possible for a new write to start */ /* TODO: consider whether it's possible for a new write to start */

View File

@ -33,7 +33,7 @@ static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
uint32 blockRowCount, uint32 blockRowCount,
uint32 columnCount); uint32 columnCount);
static StripeMetadata FlushStripe(TableWriteState *writeState); static void FlushStripe(TableWriteState *writeState);
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
bool datumTypeByValue, int datumTypeLength, bool datumTypeByValue, int datumTypeLength,
@ -45,8 +45,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
int columnTypeLength, Oid columnCollation, int columnTypeLength, Oid columnCollation,
FmgrInfo *comparisonFunction); FmgrInfo *comparisonFunction);
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
static void AppendStripeMetadata(DataFileMetadata *datafileMetadata,
StripeMetadata stripeMetadata);
static StringInfo CopyStringInfo(StringInfo sourceString); static StringInfo CopyStringInfo(StringInfo sourceString);
@ -64,34 +62,12 @@ CStoreBeginWrite(Relation relation,
TupleDesc tupleDescriptor) TupleDesc tupleDescriptor)
{ {
TableWriteState *writeState = NULL; TableWriteState *writeState = NULL;
DataFileMetadata *datafileMetadata = NULL;
FmgrInfo **comparisonFunctionArray = NULL; FmgrInfo **comparisonFunctionArray = NULL;
MemoryContext stripeWriteContext = NULL; MemoryContext stripeWriteContext = NULL;
uint64 currentFileOffset = 0;
uint32 columnCount = 0; uint32 columnCount = 0;
uint32 columnIndex = 0; uint32 columnIndex = 0;
bool *columnMaskArray = NULL; bool *columnMaskArray = NULL;
BlockData *blockData = NULL; BlockData *blockData = NULL;
uint64 currentStripeId = 0;
Oid relNode = relation->rd_node.relNode;
datafileMetadata = ReadDataFileMetadata(relNode, false);
/*
* If stripeMetadataList is not empty, jump to the position right after
* the last position.
*/
if (datafileMetadata->stripeMetadataList != NIL)
{
StripeMetadata *lastStripe = NULL;
uint64 lastStripeSize = 0;
lastStripe = llast(datafileMetadata->stripeMetadataList);
lastStripeSize += lastStripe->dataLength;
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
currentStripeId = lastStripe->id + 1;
}
/* get comparison function pointers for each of the columns */ /* get comparison function pointers for each of the columns */
columnCount = tupleDescriptor->natts; columnCount = tupleDescriptor->natts;
@ -129,19 +105,16 @@ CStoreBeginWrite(Relation relation,
writeState = palloc0(sizeof(TableWriteState)); writeState = palloc0(sizeof(TableWriteState));
writeState->relation = relation; writeState->relation = relation;
writeState->datafileMetadata = datafileMetadata;
writeState->compressionType = compressionType; writeState->compressionType = compressionType;
writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->stripeMaxRowCount = stripeMaxRowCount;
writeState->blockRowCount = blockRowCount; writeState->blockRowCount = blockRowCount;
writeState->tupleDescriptor = tupleDescriptor; writeState->tupleDescriptor = tupleDescriptor;
writeState->currentFileOffset = currentFileOffset;
writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->comparisonFunctionArray = comparisonFunctionArray;
writeState->stripeBuffers = NULL; writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL; writeState->stripeSkipList = NULL;
writeState->stripeWriteContext = stripeWriteContext; writeState->stripeWriteContext = stripeWriteContext;
writeState->blockData = blockData; writeState->blockData = blockData;
writeState->compressionBuffer = NULL; writeState->compressionBuffer = NULL;
writeState->currentStripeId = currentStripeId;
return writeState; return writeState;
} }
@ -164,7 +137,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeBuffers *stripeBuffers = writeState->stripeBuffers;
StripeSkipList *stripeSkipList = writeState->stripeSkipList; StripeSkipList *stripeSkipList = writeState->stripeSkipList;
uint32 columnCount = writeState->tupleDescriptor->natts; uint32 columnCount = writeState->tupleDescriptor->natts;
DataFileMetadata *datafileMetadata = writeState->datafileMetadata;
const uint32 blockRowCount = writeState->blockRowCount; const uint32 blockRowCount = writeState->blockRowCount;
BlockData *blockData = writeState->blockData; BlockData *blockData = writeState->blockData;
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
@ -238,28 +210,14 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
stripeBuffers->rowCount++; stripeBuffers->rowCount++;
if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount) if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount)
{ {
StripeMetadata stripeMetadata = FlushStripe(writeState); FlushStripe(writeState);
MemoryContextReset(writeState->stripeWriteContext);
writeState->currentStripeId++;
/* set stripe data and skip list to NULL so they are recreated next time */ /* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL; writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL; writeState->stripeSkipList = NULL;
}
/*
* Append stripeMetadata in old context so next MemoryContextReset
* doesn't free it.
*/
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
&stripeMetadata);
AppendStripeMetadata(datafileMetadata, stripeMetadata);
}
else
{
MemoryContextSwitchTo(oldContext);
}
} }
@ -278,17 +236,13 @@ CStoreEndWrite(TableWriteState *writeState)
{ {
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
StripeMetadata stripeMetadata = FlushStripe(writeState); FlushStripe(writeState);
MemoryContextReset(writeState->stripeWriteContext); MemoryContextReset(writeState->stripeWriteContext);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
&stripeMetadata);
AppendStripeMetadata(writeState->datafileMetadata, stripeMetadata);
} }
MemoryContextDelete(writeState->stripeWriteContext); MemoryContextDelete(writeState->stripeWriteContext);
list_free_deep(writeState->datafileMetadata->stripeMetadataList);
pfree(writeState->comparisonFunctionArray); pfree(writeState->comparisonFunctionArray);
FreeBlockData(writeState->blockData); FreeBlockData(writeState->blockData);
pfree(writeState); pfree(writeState);
@ -366,11 +320,9 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
static void static void
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
{ {
uint64 logicalOffset = writeState->currentFileOffset;
uint64 remaining = dataLength; uint64 remaining = dataLength;
Relation rel = writeState->relation;
Buffer buffer; Buffer buffer;
while (remaining > 0) while (remaining > 0)
@ -383,14 +335,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
RelationOpenSmgr(rel); RelationOpenSmgr(rel);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
Assert(addr.blockno < nblocks);
while (addr.blockno >= nblocks)
{
Buffer newBuffer = ReadBuffer(rel, P_NEW);
ReleaseBuffer(newBuffer);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
}
RelationCloseSmgr(rel); RelationCloseSmgr(rel);
buffer = ReadBuffer(rel, addr.blockno); buffer = ReadBuffer(rel, addr.blockno);
@ -459,7 +404,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
* the function creates the skip list and footer buffers. Finally, the function * the function creates the skip list and footer buffers. Finally, the function
* flushes the skip list, data, and footer buffers to the file. * flushes the skip list, data, and footer buffers to the file.
*/ */
static StripeMetadata static void
FlushStripe(TableWriteState *writeState) FlushStripe(TableWriteState *writeState)
{ {
StripeMetadata stripeMetadata = { 0 }; StripeMetadata stripeMetadata = { 0 };
@ -474,8 +419,9 @@ FlushStripe(TableWriteState *writeState)
uint32 blockRowCount = writeState->blockRowCount; uint32 blockRowCount = writeState->blockRowCount;
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
uint64 initialFileOffset = writeState->currentFileOffset; uint64 currentFileOffset = 0;
uint64 stripeSize = 0; uint64 stripeSize = 0;
uint64 stripeRowCount = 0;
/* /*
* check if the last block needs serialization , the last block was not serialized * check if the last block needs serialization , the last block was not serialized
@ -520,6 +466,18 @@ FlushStripe(TableWriteState *writeState)
} }
} }
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
stripeRowCount +=
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
}
stripeMetadata = ReserveStripe(writeState->relation, stripeSize,
stripeRowCount, columnCount, blockCount,
blockRowCount);
currentFileOffset = stripeMetadata.fileOffset;
/* /*
* Each stripe has only one section: * Each stripe has only one section:
* Data section, in which we store data for each column continuously. * Data section, in which we store data for each column continuously.
@ -541,8 +499,9 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
StringInfo existsBuffer = blockBuffers->existsBuffer; StringInfo existsBuffer = blockBuffers->existsBuffer;
WriteToSmgr(writeState, existsBuffer->data, existsBuffer->len); WriteToSmgr(writeState->relation, currentFileOffset,
writeState->currentFileOffset += existsBuffer->len; existsBuffer->data, existsBuffer->len);
currentFileOffset += existsBuffer->len;
} }
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++) for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
@ -551,30 +510,16 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex]; columnBuffers->blockBuffersArray[blockIndex];
StringInfo valueBuffer = blockBuffers->valueBuffer; StringInfo valueBuffer = blockBuffers->valueBuffer;
WriteToSmgr(writeState, valueBuffer->data, valueBuffer->len); WriteToSmgr(writeState->relation, currentFileOffset,
writeState->currentFileOffset += valueBuffer->len; valueBuffer->data, valueBuffer->len);
currentFileOffset += valueBuffer->len;
} }
} }
/* create skip list and footer buffers */ /* create skip list and footer buffers */
SaveStripeSkipList(writeState->relation->rd_node.relNode, SaveStripeSkipList(writeState->relation->rd_node.relNode,
writeState->currentStripeId, stripeMetadata.id,
stripeSkipList, tupleDescriptor); stripeSkipList, tupleDescriptor);
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
stripeMetadata.rowCount +=
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
}
stripeMetadata.fileOffset = initialFileOffset;
stripeMetadata.dataLength = stripeSize;
stripeMetadata.id = writeState->currentStripeId;
stripeMetadata.blockCount = blockCount;
stripeMetadata.blockRowCount = writeState->blockRowCount;
stripeMetadata.columnCount = columnCount;
return stripeMetadata;
} }
@ -797,21 +742,6 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength)
} }
/*
* AppendStripeMetadata adds a copy of given stripeMetadata to the given
* table footer's stripeMetadataList.
*/
static void
AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata)
{
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
datafileMetadata->stripeMetadataList = lappend(datafileMetadata->stripeMetadataList,
stripeMetadataCopy);
}
/* /*
* CopyStringInfo creates a deep copy of given source string allocating only needed * CopyStringInfo creates a deep copy of given source string allocating only needed
* amount of memory. * amount of memory.

View File

@ -68,24 +68,24 @@ ALTER TABLE t DROP COLUMN a;
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3; SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
stripe | attr | block | ?column? | ?column? stripe | attr | block | ?column? | ?column?
--------+------+-------+----------+---------- --------+------+-------+----------+----------
0 | 1 | 0 | f | f
0 | 2 | 0 | f | f
1 | 1 | 0 | f | f 1 | 1 | 0 | f | f
1 | 2 | 0 | f | f 1 | 2 | 0 | f | f
2 | 1 | 0 | f | f 2 | 1 | 0 | f | f
2 | 2 | 0 | f | f 2 | 2 | 0 | f | f
3 | 1 | 0 | f | f
3 | 2 | 0 | f | f
(6 rows) (6 rows)
VACUUM FULL t; VACUUM FULL t;
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3; SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
stripe | attr | block | ?column? | ?column? stripe | attr | block | ?column? | ?column?
--------+------+-------+----------+---------- --------+------+-------+----------+----------
0 | 1 | 0 | t | t
0 | 2 | 0 | f | f
1 | 1 | 0 | t | t 1 | 1 | 0 | t | t
1 | 2 | 0 | f | f 1 | 2 | 0 | f | f
2 | 1 | 0 | t | t 2 | 1 | 0 | t | t
2 | 2 | 0 | f | f 2 | 2 | 0 | f | f
3 | 1 | 0 | t | t
3 | 2 | 0 | f | f
(6 rows) (6 rows)
-- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands -- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands
@ -107,14 +107,14 @@ SELECT count(*) FROM t;
SELECT pg_size_pretty(pg_relation_size('t')); SELECT pg_size_pretty(pg_relation_size('t'));
pg_size_pretty pg_size_pretty
---------------- ----------------
16 kB 32 kB
(1 row) (1 row)
INSERT INTO t SELECT i FROM generate_series(1, 10000) i; INSERT INTO t SELECT i FROM generate_series(1, 10000) i;
SELECT pg_size_pretty(pg_relation_size('t')); SELECT pg_size_pretty(pg_relation_size('t'));
pg_size_pretty pg_size_pretty
---------------- ----------------
56 kB 112 kB
(1 row) (1 row)
SELECT count(*) FROM t; SELECT count(*) FROM t;
@ -129,23 +129,23 @@ ROLLBACK TO SAVEPOINT s1;
SELECT pg_size_pretty(pg_relation_size('t')); SELECT pg_size_pretty(pg_relation_size('t'));
pg_size_pretty pg_size_pretty
---------------- ----------------
56 kB 112 kB
(1 row) (1 row)
COMMIT; COMMIT;
-- vacuum should truncate the relation to the usable space -- vacuum should truncate the relation to the usable space
VACUUM VERBOSE t; VACUUM VERBOSE t;
INFO: statistics for "t": INFO: statistics for "t":
total file size: 57344, total data size: 10754 total file size: 114688, total data size: 10754
total row count: 2530, stripe count: 3, average rows per stripe: 843 total row count: 2530, stripe count: 3, average rows per stripe: 843
block count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0 block count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
INFO: "t": truncated 7 to 2 pages INFO: "t": truncated 14 to 4 pages
DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s
SELECT pg_size_pretty(pg_relation_size('t')); SELECT pg_size_pretty(pg_relation_size('t'));
pg_size_pretty pg_size_pretty
---------------- ----------------
16 kB 32 kB
(1 row) (1 row)
SELECT count(*) FROM t; SELECT count(*) FROM t;
@ -172,7 +172,7 @@ INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i;
COMMIT; COMMIT;
VACUUM VERBOSE t; VACUUM VERBOSE t;
INFO: statistics for "t": INFO: statistics for "t":
total file size: 24576, total data size: 18808 total file size: 49152, total data size: 18808
total row count: 5530, stripe count: 5, average rows per stripe: 1106 total row count: 5530, stripe count: 5, average rows per stripe: 1106
block count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2 block count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
@ -188,7 +188,7 @@ INSERT INTO t SELECT 1, i / 5 FROM generate_series(1, 1500) i;
ALTER TABLE t DROP COLUMN c; ALTER TABLE t DROP COLUMN c;
VACUUM VERBOSE t; VACUUM VERBOSE t;
INFO: statistics for "t": INFO: statistics for "t":
total file size: 32768, total data size: 31372 total file size: 65536, total data size: 31372
total row count: 7030, stripe count: 6, average rows per stripe: 1171 total row count: 7030, stripe count: 6, average rows per stripe: 1171
block count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2 block count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2
@ -199,7 +199,7 @@ SET cstore.compression TO "pglz";
VACUUM FULL t; VACUUM FULL t;
VACUUM VERBOSE t; VACUUM VERBOSE t;
INFO: statistics for "t": INFO: statistics for "t":
total file size: 16384, total data size: 15728 total file size: 49152, total data size: 15728
total row count: 7030, stripe count: 4, average rows per stripe: 1757 total row count: 7030, stripe count: 4, average rows per stripe: 1757
block count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6 block count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6

View File

@ -11,7 +11,7 @@ step s1-insert:
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i; INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
s2: INFO: statistics for "test_vacuum_vs_insert": s2: INFO: statistics for "test_vacuum_vs_insert":
total file size: 8192, total data size: 26 total file size: 24576, total data size: 26
total row count: 3, stripe count: 1, average rows per stripe: 3 total row count: 3, stripe count: 1, average rows per stripe: 3
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0 block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
@ -51,7 +51,7 @@ step s1-commit:
COMMIT; COMMIT;
s2: INFO: vacuuming "public.test_vacuum_vs_insert" s2: INFO: vacuuming "public.test_vacuum_vs_insert"
s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 1 pages s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 3 pages
DETAIL: 0 dead row versions cannot be removed yet. DETAIL: 0 dead row versions cannot be removed yet.
CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s. CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
step s2-vacuum-full: <... completed> step s2-vacuum-full: <... completed>

View File

@ -0,0 +1,142 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s2-begin s1-insert s2-insert s1-select s2-select s1-commit s2-commit s1-select
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
step s2-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
1 2
2 4
3 6
step s2-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
4 8
5 10
6 12
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
1 2
2 4
3 6
4 8
5 10
6 12
starting permutation: s1-begin s2-begin s1-copy s2-insert s1-select s2-select s1-commit s2-commit s1-select
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-copy:
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
step s2-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
11
12
13
step s2-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
4 8
5 10
6 12
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
4 8
5 10
6 12
11
12
13
starting permutation: s1-begin s2-begin s2-insert s1-copy s1-select s2-select s1-commit s2-commit s1-select
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
step s1-copy:
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
11
12
13
step s2-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
4 8
5 10
6 12
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s1-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a b
4 8
5 10
6 12
11
12
13

View File

@ -0,0 +1,67 @@
setup
{
CREATE TABLE test_insert_concurrency (a int, b int) USING cstore_tableam;
}
teardown
{
DROP TABLE IF EXISTS test_insert_concurrency CASCADE;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-insert"
{
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
}
step "s1-copy"
{
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
}
step "s1-select"
{
SELECT * FROM test_insert_concurrency ORDER BY a;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert"
{
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
}
step "s2-select"
{
SELECT * FROM test_insert_concurrency ORDER BY a;
}
step "s2-commit"
{
COMMIT;
}
# writes shouldn't block writes or reads
permutation "s1-begin" "s2-begin" "s1-insert" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
# copy vs insert
permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
# insert vs copy
permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"