mirror of https://github.com/citusdata/citus.git
Implement concurrent writes
parent
4303758a28
commit
c92ea1de96
2
Makefile
2
Makefile
|
@ -55,7 +55,7 @@ ifeq ($(USE_TABLEAM),yes)
|
|||
REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \
|
||||
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \
|
||||
am_block_filtering am_join am_trigger
|
||||
ISOLATION += am_vacuum_vs_insert
|
||||
ISOLATION += am_write_concurrency am_vacuum_vs_insert
|
||||
endif
|
||||
|
||||
ifeq ($(enable_coverage),yes)
|
||||
|
|
33
cstore.h
33
cstore.h
|
@ -218,15 +218,12 @@ typedef struct TableReadState
|
|||
/* TableWriteState represents state of a cstore file write operation. */
|
||||
typedef struct TableWriteState
|
||||
{
|
||||
DataFileMetadata *datafileMetadata;
|
||||
CompressionType compressionType;
|
||||
TupleDesc tupleDescriptor;
|
||||
FmgrInfo **comparisonFunctionArray;
|
||||
uint64 currentFileOffset;
|
||||
Relation relation;
|
||||
|
||||
MemoryContext stripeWriteContext;
|
||||
uint64 currentStripeId;
|
||||
StripeBuffers *stripeBuffers;
|
||||
StripeSkipList *stripeSkipList;
|
||||
uint32 stripeMaxRowCount;
|
||||
|
@ -284,9 +281,11 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio
|
|||
/* cstore_metadata_tables.c */
|
||||
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
|
||||
extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount);
|
||||
extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
|
||||
extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk);
|
||||
extern uint64 GetHighestUsedAddress(Oid relfilenode);
|
||||
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 blockCount, uint64 blockRowCount);
|
||||
extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe,
|
||||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
|
@ -317,4 +316,30 @@ logical_to_smgr(uint64 logicalOffset)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* Map a physical page adnd offset address to a logical address.
|
||||
*/
|
||||
static inline uint64
|
||||
smgr_to_logical(SmgrAddr addr)
|
||||
{
|
||||
uint64 bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
|
||||
return bytes_per_page * addr.blockno + addr.offset - SizeOfPageHeaderData;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Get the first usable address of next block.
|
||||
*/
|
||||
static inline SmgrAddr
|
||||
next_block_start(SmgrAddr addr)
|
||||
{
|
||||
SmgrAddr result = {
|
||||
.blockno = addr.blockno + 1,
|
||||
.offset = SizeOfPageHeaderData
|
||||
};
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
#endif /* CSTORE_H */
|
||||
|
|
12
cstore_fdw.c
12
cstore_fdw.c
|
@ -494,10 +494,10 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
Assert(copyStatement->relation != NULL);
|
||||
|
||||
/*
|
||||
* Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow
|
||||
* concurrent reads, but block concurrent writes.
|
||||
* Open and lock the relation. We acquire RowExclusiveLock to allow
|
||||
* concurrent reads and writes.
|
||||
*/
|
||||
relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock);
|
||||
relation = cstore_fdw_openrv(copyStatement->relation, RowExclusiveLock);
|
||||
relationId = RelationGetRelid(relation);
|
||||
|
||||
/* allocate column values and nulls arrays */
|
||||
|
@ -572,7 +572,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
/* end read/write sessions and close the relation */
|
||||
EndCopyFrom(copyState);
|
||||
CStoreEndWrite(writeState);
|
||||
heap_close(relation, ShareUpdateExclusiveLock);
|
||||
heap_close(relation, RowExclusiveLock);
|
||||
|
||||
return processedRowCount;
|
||||
}
|
||||
|
@ -2015,7 +2015,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
|
|||
Relation relation = NULL;
|
||||
|
||||
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
|
||||
relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock);
|
||||
relation = cstore_fdw_open(foreignTableOid, RowExclusiveLock);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
|
@ -2086,7 +2086,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo)
|
|||
Relation relation = writeState->relation;
|
||||
|
||||
CStoreEndWrite(writeState);
|
||||
heap_close(relation, ShareUpdateExclusiveLock);
|
||||
heap_close(relation, RowExclusiveLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@
|
|||
#include "lib/stringinfo.h"
|
||||
#include "port.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/memutils.h"
|
||||
|
@ -43,6 +45,10 @@ typedef struct
|
|||
EState *estate;
|
||||
} ModifyState;
|
||||
|
||||
static void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe);
|
||||
static void GetHighestUsedAddressAndId(Oid relfilenode,
|
||||
uint64 *highestUsedAddress,
|
||||
uint64 *highestUsedId);
|
||||
static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot);
|
||||
static Oid CStoreStripesRelationId(void);
|
||||
static Oid CStoreStripesIndexRelationId(void);
|
||||
|
@ -311,7 +317,7 @@ ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
|||
/*
|
||||
* InsertStripeMetadataRow adds a row to cstore_stripes.
|
||||
*/
|
||||
void
|
||||
static void
|
||||
InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
|
||||
{
|
||||
bool nulls[Natts_cstore_stripes] = { 0 };
|
||||
|
@ -330,7 +336,9 @@ InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe)
|
|||
Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock);
|
||||
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreStripes);
|
||||
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
|
||||
FinishModifyRelation(modifyState);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
@ -376,6 +384,23 @@ uint64
|
|||
GetHighestUsedAddress(Oid relfilenode)
|
||||
{
|
||||
uint64 highestUsedAddress = 0;
|
||||
uint64 highestUsedId = 0;
|
||||
|
||||
GetHighestUsedAddressAndId(relfilenode, &highestUsedAddress, &highestUsedId);
|
||||
|
||||
return highestUsedAddress;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetHighestUsedAddressAndId returns the highest used address and id for
|
||||
* the given relfilenode across all active and inactive transactions.
|
||||
*/
|
||||
static void
|
||||
GetHighestUsedAddressAndId(Oid relfilenode,
|
||||
uint64 *highestUsedAddress,
|
||||
uint64 *highestUsedId)
|
||||
{
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
List *stripeMetadataList = NIL;
|
||||
|
||||
|
@ -384,14 +409,83 @@ GetHighestUsedAddress(Oid relfilenode)
|
|||
|
||||
stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty);
|
||||
|
||||
*highestUsedId = 0;
|
||||
*highestUsedAddress = 0;
|
||||
|
||||
foreach(stripeMetadataCell, stripeMetadataList)
|
||||
{
|
||||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||
uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
|
||||
highestUsedAddress = Max(highestUsedAddress, lastByte);
|
||||
*highestUsedAddress = Max(*highestUsedAddress, lastByte);
|
||||
*highestUsedId = Max(*highestUsedId, stripe->id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReserveStripe reserves and stripe of given size for the given relation,
|
||||
* and inserts it into cstore_stripes. It is guaranteed that concurrent
|
||||
* writes won't overwrite the returned stripe.
|
||||
*/
|
||||
StripeMetadata
|
||||
ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 blockCount, uint64 blockRowCount)
|
||||
{
|
||||
StripeMetadata stripe = { 0 };
|
||||
Oid relfilenode = InvalidOid;
|
||||
uint64 currLogicalHigh = 0;
|
||||
SmgrAddr currSmgrHigh;
|
||||
uint64 nblocks = 0;
|
||||
uint64 resLogicalStart = 0;
|
||||
SmgrAddr resSmgrStart;
|
||||
uint64 resLogicalEnd = 0;
|
||||
SmgrAddr resSmgrEnd;
|
||||
uint64 highestId = 0;
|
||||
|
||||
/*
|
||||
* We take ShareUpdateExclusiveLock here, so two space
|
||||
* reservations conflict, space reservation <-> vacuum
|
||||
* conflict, but space reservation doesn't conflict with
|
||||
* reads & writes.
|
||||
*/
|
||||
LockRelation(rel, ShareUpdateExclusiveLock);
|
||||
|
||||
relfilenode = rel->rd_node.relNode;
|
||||
GetHighestUsedAddressAndId(relfilenode, &currLogicalHigh, &highestId);
|
||||
currSmgrHigh = logical_to_smgr(currLogicalHigh);
|
||||
|
||||
resSmgrStart = next_block_start(currSmgrHigh);
|
||||
resLogicalStart = smgr_to_logical(resSmgrStart);
|
||||
|
||||
resLogicalEnd = resLogicalStart + sizeBytes - 1;
|
||||
resSmgrEnd = logical_to_smgr(resLogicalEnd);
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
while (resSmgrEnd.blockno >= nblocks)
|
||||
{
|
||||
Buffer newBuffer = ReadBuffer(rel, P_NEW);
|
||||
ReleaseBuffer(newBuffer);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
}
|
||||
|
||||
return highestUsedAddress;
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
stripe.fileOffset = resLogicalStart;
|
||||
stripe.dataLength = sizeBytes;
|
||||
stripe.blockCount = blockCount;
|
||||
stripe.blockRowCount = blockRowCount;
|
||||
stripe.columnCount = columnCount;
|
||||
stripe.rowCount = rowCount;
|
||||
stripe.id = highestId + 1;
|
||||
|
||||
InsertStripeMetadataRow(relfilenode, &stripe);
|
||||
|
||||
UnlockRelation(rel, ShareUpdateExclusiveLock);
|
||||
|
||||
return stripe;
|
||||
}
|
||||
|
||||
|
||||
|
@ -419,7 +513,7 @@ ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot)
|
|||
index = index_open(CStoreStripesIndexRelationId(), AccessShareLock);
|
||||
tupleDescriptor = RelationGetDescr(cstoreStripes);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, NULL, 1,
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, snapshot, 1,
|
||||
scanKey);
|
||||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
|
@ -593,6 +687,7 @@ InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
|
|||
#if PG_VERSION_NUM >= 120000
|
||||
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
|
||||
&TTSOpsHeapTuple);
|
||||
|
||||
ExecStoreHeapTuple(tuple, slot, false);
|
||||
#else
|
||||
TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor);
|
||||
|
|
|
@ -120,8 +120,6 @@ ResetCStoreMemoryContext()
|
|||
static void
|
||||
cstore_init_write_state(Relation relation)
|
||||
{
|
||||
/*TODO: upgrade lock to serialize writes */
|
||||
|
||||
if (CStoreWriteState != NULL)
|
||||
{
|
||||
/* TODO: consider whether it's possible for a new write to start */
|
||||
|
|
128
cstore_writer.c
128
cstore_writer.c
|
@ -33,7 +33,7 @@ static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
|||
static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
|
||||
uint32 blockRowCount,
|
||||
uint32 columnCount);
|
||||
static StripeMetadata FlushStripe(TableWriteState *writeState);
|
||||
static void FlushStripe(TableWriteState *writeState);
|
||||
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
|
||||
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
|
||||
bool datumTypeByValue, int datumTypeLength,
|
||||
|
@ -45,8 +45,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
|
|||
int columnTypeLength, Oid columnCollation,
|
||||
FmgrInfo *comparisonFunction);
|
||||
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
||||
static void AppendStripeMetadata(DataFileMetadata *datafileMetadata,
|
||||
StripeMetadata stripeMetadata);
|
||||
static StringInfo CopyStringInfo(StringInfo sourceString);
|
||||
|
||||
|
||||
|
@ -64,34 +62,12 @@ CStoreBeginWrite(Relation relation,
|
|||
TupleDesc tupleDescriptor)
|
||||
{
|
||||
TableWriteState *writeState = NULL;
|
||||
DataFileMetadata *datafileMetadata = NULL;
|
||||
FmgrInfo **comparisonFunctionArray = NULL;
|
||||
MemoryContext stripeWriteContext = NULL;
|
||||
uint64 currentFileOffset = 0;
|
||||
uint32 columnCount = 0;
|
||||
uint32 columnIndex = 0;
|
||||
bool *columnMaskArray = NULL;
|
||||
BlockData *blockData = NULL;
|
||||
uint64 currentStripeId = 0;
|
||||
Oid relNode = relation->rd_node.relNode;
|
||||
|
||||
datafileMetadata = ReadDataFileMetadata(relNode, false);
|
||||
|
||||
/*
|
||||
* If stripeMetadataList is not empty, jump to the position right after
|
||||
* the last position.
|
||||
*/
|
||||
if (datafileMetadata->stripeMetadataList != NIL)
|
||||
{
|
||||
StripeMetadata *lastStripe = NULL;
|
||||
uint64 lastStripeSize = 0;
|
||||
|
||||
lastStripe = llast(datafileMetadata->stripeMetadataList);
|
||||
lastStripeSize += lastStripe->dataLength;
|
||||
|
||||
currentFileOffset = lastStripe->fileOffset + lastStripeSize;
|
||||
currentStripeId = lastStripe->id + 1;
|
||||
}
|
||||
|
||||
/* get comparison function pointers for each of the columns */
|
||||
columnCount = tupleDescriptor->natts;
|
||||
|
@ -129,19 +105,16 @@ CStoreBeginWrite(Relation relation,
|
|||
|
||||
writeState = palloc0(sizeof(TableWriteState));
|
||||
writeState->relation = relation;
|
||||
writeState->datafileMetadata = datafileMetadata;
|
||||
writeState->compressionType = compressionType;
|
||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||
writeState->blockRowCount = blockRowCount;
|
||||
writeState->tupleDescriptor = tupleDescriptor;
|
||||
writeState->currentFileOffset = currentFileOffset;
|
||||
writeState->comparisonFunctionArray = comparisonFunctionArray;
|
||||
writeState->stripeBuffers = NULL;
|
||||
writeState->stripeSkipList = NULL;
|
||||
writeState->stripeWriteContext = stripeWriteContext;
|
||||
writeState->blockData = blockData;
|
||||
writeState->compressionBuffer = NULL;
|
||||
writeState->currentStripeId = currentStripeId;
|
||||
|
||||
return writeState;
|
||||
}
|
||||
|
@ -164,7 +137,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
|||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||
uint32 columnCount = writeState->tupleDescriptor->natts;
|
||||
DataFileMetadata *datafileMetadata = writeState->datafileMetadata;
|
||||
const uint32 blockRowCount = writeState->blockRowCount;
|
||||
BlockData *blockData = writeState->blockData;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||
|
@ -238,28 +210,14 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
|||
stripeBuffers->rowCount++;
|
||||
if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount)
|
||||
{
|
||||
StripeMetadata stripeMetadata = FlushStripe(writeState);
|
||||
MemoryContextReset(writeState->stripeWriteContext);
|
||||
|
||||
writeState->currentStripeId++;
|
||||
FlushStripe(writeState);
|
||||
|
||||
/* set stripe data and skip list to NULL so they are recreated next time */
|
||||
writeState->stripeBuffers = NULL;
|
||||
writeState->stripeSkipList = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Append stripeMetadata in old context so next MemoryContextReset
|
||||
* doesn't free it.
|
||||
*/
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||
&stripeMetadata);
|
||||
AppendStripeMetadata(datafileMetadata, stripeMetadata);
|
||||
}
|
||||
else
|
||||
{
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
|
@ -278,17 +236,13 @@ CStoreEndWrite(TableWriteState *writeState)
|
|||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||
|
||||
StripeMetadata stripeMetadata = FlushStripe(writeState);
|
||||
FlushStripe(writeState);
|
||||
MemoryContextReset(writeState->stripeWriteContext);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
InsertStripeMetadataRow(writeState->relation->rd_node.relNode,
|
||||
&stripeMetadata);
|
||||
AppendStripeMetadata(writeState->datafileMetadata, stripeMetadata);
|
||||
}
|
||||
|
||||
MemoryContextDelete(writeState->stripeWriteContext);
|
||||
list_free_deep(writeState->datafileMetadata->stripeMetadataList);
|
||||
pfree(writeState->comparisonFunctionArray);
|
||||
FreeBlockData(writeState->blockData);
|
||||
pfree(writeState);
|
||||
|
@ -366,11 +320,9 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
|||
|
||||
|
||||
static void
|
||||
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||
WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
||||
{
|
||||
uint64 logicalOffset = writeState->currentFileOffset;
|
||||
uint64 remaining = dataLength;
|
||||
Relation rel = writeState->relation;
|
||||
Buffer buffer;
|
||||
|
||||
while (remaining > 0)
|
||||
|
@ -383,14 +335,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
|||
|
||||
RelationOpenSmgr(rel);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
while (addr.blockno >= nblocks)
|
||||
{
|
||||
Buffer newBuffer = ReadBuffer(rel, P_NEW);
|
||||
ReleaseBuffer(newBuffer);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
}
|
||||
|
||||
Assert(addr.blockno < nblocks);
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
buffer = ReadBuffer(rel, addr.blockno);
|
||||
|
@ -459,7 +404,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
|||
* the function creates the skip list and footer buffers. Finally, the function
|
||||
* flushes the skip list, data, and footer buffers to the file.
|
||||
*/
|
||||
static StripeMetadata
|
||||
static void
|
||||
FlushStripe(TableWriteState *writeState)
|
||||
{
|
||||
StripeMetadata stripeMetadata = { 0 };
|
||||
|
@ -474,8 +419,9 @@ FlushStripe(TableWriteState *writeState)
|
|||
uint32 blockRowCount = writeState->blockRowCount;
|
||||
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
|
||||
uint64 initialFileOffset = writeState->currentFileOffset;
|
||||
uint64 currentFileOffset = 0;
|
||||
uint64 stripeSize = 0;
|
||||
uint64 stripeRowCount = 0;
|
||||
|
||||
/*
|
||||
* check if the last block needs serialization , the last block was not serialized
|
||||
|
@ -520,6 +466,18 @@ FlushStripe(TableWriteState *writeState)
|
|||
}
|
||||
}
|
||||
|
||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||
{
|
||||
stripeRowCount +=
|
||||
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
|
||||
}
|
||||
|
||||
stripeMetadata = ReserveStripe(writeState->relation, stripeSize,
|
||||
stripeRowCount, columnCount, blockCount,
|
||||
blockRowCount);
|
||||
|
||||
currentFileOffset = stripeMetadata.fileOffset;
|
||||
|
||||
/*
|
||||
* Each stripe has only one section:
|
||||
* Data section, in which we store data for each column continuously.
|
||||
|
@ -541,8 +499,9 @@ FlushStripe(TableWriteState *writeState)
|
|||
columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo existsBuffer = blockBuffers->existsBuffer;
|
||||
|
||||
WriteToSmgr(writeState, existsBuffer->data, existsBuffer->len);
|
||||
writeState->currentFileOffset += existsBuffer->len;
|
||||
WriteToSmgr(writeState->relation, currentFileOffset,
|
||||
existsBuffer->data, existsBuffer->len);
|
||||
currentFileOffset += existsBuffer->len;
|
||||
}
|
||||
|
||||
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
|
||||
|
@ -551,30 +510,16 @@ FlushStripe(TableWriteState *writeState)
|
|||
columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo valueBuffer = blockBuffers->valueBuffer;
|
||||
|
||||
WriteToSmgr(writeState, valueBuffer->data, valueBuffer->len);
|
||||
writeState->currentFileOffset += valueBuffer->len;
|
||||
WriteToSmgr(writeState->relation, currentFileOffset,
|
||||
valueBuffer->data, valueBuffer->len);
|
||||
currentFileOffset += valueBuffer->len;
|
||||
}
|
||||
}
|
||||
|
||||
/* create skip list and footer buffers */
|
||||
SaveStripeSkipList(writeState->relation->rd_node.relNode,
|
||||
writeState->currentStripeId,
|
||||
stripeMetadata.id,
|
||||
stripeSkipList, tupleDescriptor);
|
||||
|
||||
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
|
||||
{
|
||||
stripeMetadata.rowCount +=
|
||||
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
|
||||
}
|
||||
|
||||
stripeMetadata.fileOffset = initialFileOffset;
|
||||
stripeMetadata.dataLength = stripeSize;
|
||||
stripeMetadata.id = writeState->currentStripeId;
|
||||
stripeMetadata.blockCount = blockCount;
|
||||
stripeMetadata.blockRowCount = writeState->blockRowCount;
|
||||
stripeMetadata.columnCount = columnCount;
|
||||
|
||||
return stripeMetadata;
|
||||
}
|
||||
|
||||
|
||||
|
@ -797,21 +742,6 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendStripeMetadata adds a copy of given stripeMetadata to the given
|
||||
* table footer's stripeMetadataList.
|
||||
*/
|
||||
static void
|
||||
AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata)
|
||||
{
|
||||
StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata));
|
||||
memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata));
|
||||
|
||||
datafileMetadata->stripeMetadataList = lappend(datafileMetadata->stripeMetadataList,
|
||||
stripeMetadataCopy);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyStringInfo creates a deep copy of given source string allocating only needed
|
||||
* amount of memory.
|
||||
|
|
|
@ -68,24 +68,24 @@ ALTER TABLE t DROP COLUMN a;
|
|||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
stripe | attr | block | ?column? | ?column?
|
||||
--------+------+-------+----------+----------
|
||||
0 | 1 | 0 | f | f
|
||||
0 | 2 | 0 | f | f
|
||||
1 | 1 | 0 | f | f
|
||||
1 | 2 | 0 | f | f
|
||||
2 | 1 | 0 | f | f
|
||||
2 | 2 | 0 | f | f
|
||||
3 | 1 | 0 | f | f
|
||||
3 | 2 | 0 | f | f
|
||||
(6 rows)
|
||||
|
||||
VACUUM FULL t;
|
||||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
stripe | attr | block | ?column? | ?column?
|
||||
--------+------+-------+----------+----------
|
||||
0 | 1 | 0 | t | t
|
||||
0 | 2 | 0 | f | f
|
||||
1 | 1 | 0 | t | t
|
||||
1 | 2 | 0 | f | f
|
||||
2 | 1 | 0 | t | t
|
||||
2 | 2 | 0 | f | f
|
||||
3 | 1 | 0 | t | t
|
||||
3 | 2 | 0 | f | f
|
||||
(6 rows)
|
||||
|
||||
-- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands
|
||||
|
@ -107,14 +107,14 @@ SELECT count(*) FROM t;
|
|||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
pg_size_pretty
|
||||
----------------
|
||||
16 kB
|
||||
32 kB
|
||||
(1 row)
|
||||
|
||||
INSERT INTO t SELECT i FROM generate_series(1, 10000) i;
|
||||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
pg_size_pretty
|
||||
----------------
|
||||
56 kB
|
||||
112 kB
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM t;
|
||||
|
@ -129,23 +129,23 @@ ROLLBACK TO SAVEPOINT s1;
|
|||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
pg_size_pretty
|
||||
----------------
|
||||
56 kB
|
||||
112 kB
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- vacuum should truncate the relation to the usable space
|
||||
VACUUM VERBOSE t;
|
||||
INFO: statistics for "t":
|
||||
total file size: 57344, total data size: 10754
|
||||
total file size: 114688, total data size: 10754
|
||||
total row count: 2530, stripe count: 3, average rows per stripe: 843
|
||||
block count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0
|
||||
|
||||
INFO: "t": truncated 7 to 2 pages
|
||||
INFO: "t": truncated 14 to 4 pages
|
||||
DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s
|
||||
SELECT pg_size_pretty(pg_relation_size('t'));
|
||||
pg_size_pretty
|
||||
----------------
|
||||
16 kB
|
||||
32 kB
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM t;
|
||||
|
@ -172,7 +172,7 @@ INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i;
|
|||
COMMIT;
|
||||
VACUUM VERBOSE t;
|
||||
INFO: statistics for "t":
|
||||
total file size: 24576, total data size: 18808
|
||||
total file size: 49152, total data size: 18808
|
||||
total row count: 5530, stripe count: 5, average rows per stripe: 1106
|
||||
block count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
|
||||
|
||||
|
@ -188,7 +188,7 @@ INSERT INTO t SELECT 1, i / 5 FROM generate_series(1, 1500) i;
|
|||
ALTER TABLE t DROP COLUMN c;
|
||||
VACUUM VERBOSE t;
|
||||
INFO: statistics for "t":
|
||||
total file size: 32768, total data size: 31372
|
||||
total file size: 65536, total data size: 31372
|
||||
total row count: 7030, stripe count: 6, average rows per stripe: 1171
|
||||
block count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2
|
||||
|
||||
|
@ -199,7 +199,7 @@ SET cstore.compression TO "pglz";
|
|||
VACUUM FULL t;
|
||||
VACUUM VERBOSE t;
|
||||
INFO: statistics for "t":
|
||||
total file size: 16384, total data size: 15728
|
||||
total file size: 49152, total data size: 15728
|
||||
total row count: 7030, stripe count: 4, average rows per stripe: 1757
|
||||
block count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ step s1-insert:
|
|||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
s2: INFO: statistics for "test_vacuum_vs_insert":
|
||||
total file size: 8192, total data size: 26
|
||||
total file size: 24576, total data size: 26
|
||||
total row count: 3, stripe count: 1, average rows per stripe: 3
|
||||
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
||||
|
||||
|
@ -51,7 +51,7 @@ step s1-commit:
|
|||
COMMIT;
|
||||
|
||||
s2: INFO: vacuuming "public.test_vacuum_vs_insert"
|
||||
s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 1 pages
|
||||
s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 3 pages
|
||||
DETAIL: 0 dead row versions cannot be removed yet.
|
||||
CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
|
||||
step s2-vacuum-full: <... completed>
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert s2-insert s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-copy s2-insert s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy:
|
||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
||||
|
||||
starting permutation: s1-begin s2-begin s2-insert s1-copy s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-copy:
|
||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
|
@ -0,0 +1,67 @@
|
|||
setup
|
||||
{
|
||||
CREATE TABLE test_insert_concurrency (a int, b int) USING cstore_tableam;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS test_insert_concurrency CASCADE;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-insert"
|
||||
{
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
}
|
||||
|
||||
step "s1-copy"
|
||||
{
|
||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||
}
|
||||
|
||||
step "s1-select"
|
||||
{
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-insert"
|
||||
{
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
}
|
||||
|
||||
step "s2-select"
|
||||
{
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
# writes shouldn't block writes or reads
|
||||
permutation "s1-begin" "s2-begin" "s1-insert" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
||||
|
||||
# copy vs insert
|
||||
permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
||||
|
||||
# insert vs copy
|
||||
permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
Loading…
Reference in New Issue