diff --git a/Makefile b/Makefile index 0d581f145..6804bae42 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,7 @@ ifeq ($(USE_TABLEAM),yes) REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \ am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \ am_block_filtering am_join am_trigger - ISOLATION += am_vacuum_vs_insert + ISOLATION += am_write_concurrency am_vacuum_vs_insert endif ifeq ($(enable_coverage),yes) diff --git a/cstore.h b/cstore.h index f5e0590a8..7ff657e33 100644 --- a/cstore.h +++ b/cstore.h @@ -218,15 +218,12 @@ typedef struct TableReadState /* TableWriteState represents state of a cstore file write operation. */ typedef struct TableWriteState { - DataFileMetadata *datafileMetadata; CompressionType compressionType; TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; - uint64 currentFileOffset; Relation relation; MemoryContext stripeWriteContext; - uint64 currentStripeId; StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; uint32 stripeMaxRowCount; @@ -284,9 +281,11 @@ extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressio /* cstore_metadata_tables.c */ extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode); extern void InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount); -extern void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe); extern DataFileMetadata * ReadDataFileMetadata(Oid relfilenode, bool missingOk); extern uint64 GetHighestUsedAddress(Oid relfilenode); +extern StripeMetadata ReserveStripe(Relation rel, uint64 size, + uint64 rowCount, uint64 columnCount, + uint64 blockCount, uint64 blockRowCount); extern void SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor); @@ -317,4 +316,30 @@ logical_to_smgr(uint64 logicalOffset) } +/* + * Map a physical page adnd offset address to a logical address. + */ +static inline uint64 +smgr_to_logical(SmgrAddr addr) +{ + uint64 bytes_per_page = BLCKSZ - SizeOfPageHeaderData; + return bytes_per_page * addr.blockno + addr.offset - SizeOfPageHeaderData; +} + + +/* + * Get the first usable address of next block. + */ +static inline SmgrAddr +next_block_start(SmgrAddr addr) +{ + SmgrAddr result = { + .blockno = addr.blockno + 1, + .offset = SizeOfPageHeaderData + }; + + return result; +} + + #endif /* CSTORE_H */ diff --git a/cstore_fdw.c b/cstore_fdw.c index 2790efaca..221c97843 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -494,10 +494,10 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) Assert(copyStatement->relation != NULL); /* - * Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow - * concurrent reads, but block concurrent writes. + * Open and lock the relation. We acquire RowExclusiveLock to allow + * concurrent reads and writes. */ - relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock); + relation = cstore_fdw_openrv(copyStatement->relation, RowExclusiveLock); relationId = RelationGetRelid(relation); /* allocate column values and nulls arrays */ @@ -572,7 +572,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) /* end read/write sessions and close the relation */ EndCopyFrom(copyState); CStoreEndWrite(writeState); - heap_close(relation, ShareUpdateExclusiveLock); + heap_close(relation, RowExclusiveLock); return processedRowCount; } @@ -2015,7 +2015,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela Relation relation = NULL; foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc); - relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock); + relation = cstore_fdw_open(foreignTableOid, RowExclusiveLock); cstoreOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); @@ -2086,7 +2086,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo) Relation relation = writeState->relation; CStoreEndWrite(writeState); - heap_close(relation, ShareUpdateExclusiveLock); + heap_close(relation, RowExclusiveLock); } } diff --git a/cstore_metadata_tables.c b/cstore_metadata_tables.c index ced5900d6..1bfc4be49 100644 --- a/cstore_metadata_tables.c +++ b/cstore_metadata_tables.c @@ -31,6 +31,8 @@ #include "lib/stringinfo.h" #include "port.h" #include "storage/fd.h" +#include "storage/lmgr.h" +#include "storage/smgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" @@ -43,6 +45,10 @@ typedef struct EState *estate; } ModifyState; +static void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe); +static void GetHighestUsedAddressAndId(Oid relfilenode, + uint64 *highestUsedAddress, + uint64 *highestUsedId); static List * ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot); static Oid CStoreStripesRelationId(void); static Oid CStoreStripesIndexRelationId(void); @@ -311,7 +317,7 @@ ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor, /* * InsertStripeMetadataRow adds a row to cstore_stripes. */ -void +static void InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe) { bool nulls[Natts_cstore_stripes] = { 0 }; @@ -330,7 +336,9 @@ InsertStripeMetadataRow(Oid relfilenode, StripeMetadata *stripe) Relation cstoreStripes = heap_open(cstoreStripesOid, RowExclusiveLock); ModifyState *modifyState = StartModifyRelation(cstoreStripes); + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + FinishModifyRelation(modifyState); CommandCounterIncrement(); @@ -376,6 +384,23 @@ uint64 GetHighestUsedAddress(Oid relfilenode) { uint64 highestUsedAddress = 0; + uint64 highestUsedId = 0; + + GetHighestUsedAddressAndId(relfilenode, &highestUsedAddress, &highestUsedId); + + return highestUsedAddress; +} + + +/* + * GetHighestUsedAddressAndId returns the highest used address and id for + * the given relfilenode across all active and inactive transactions. + */ +static void +GetHighestUsedAddressAndId(Oid relfilenode, + uint64 *highestUsedAddress, + uint64 *highestUsedId) +{ ListCell *stripeMetadataCell = NULL; List *stripeMetadataList = NIL; @@ -384,14 +409,83 @@ GetHighestUsedAddress(Oid relfilenode) stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty); + *highestUsedId = 0; + *highestUsedAddress = 0; + foreach(stripeMetadataCell, stripeMetadataList) { StripeMetadata *stripe = lfirst(stripeMetadataCell); uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1; - highestUsedAddress = Max(highestUsedAddress, lastByte); + *highestUsedAddress = Max(*highestUsedAddress, lastByte); + *highestUsedId = Max(*highestUsedId, stripe->id); + } +} + + +/* + * ReserveStripe reserves and stripe of given size for the given relation, + * and inserts it into cstore_stripes. It is guaranteed that concurrent + * writes won't overwrite the returned stripe. + */ +StripeMetadata +ReserveStripe(Relation rel, uint64 sizeBytes, + uint64 rowCount, uint64 columnCount, + uint64 blockCount, uint64 blockRowCount) +{ + StripeMetadata stripe = { 0 }; + Oid relfilenode = InvalidOid; + uint64 currLogicalHigh = 0; + SmgrAddr currSmgrHigh; + uint64 nblocks = 0; + uint64 resLogicalStart = 0; + SmgrAddr resSmgrStart; + uint64 resLogicalEnd = 0; + SmgrAddr resSmgrEnd; + uint64 highestId = 0; + + /* + * We take ShareUpdateExclusiveLock here, so two space + * reservations conflict, space reservation <-> vacuum + * conflict, but space reservation doesn't conflict with + * reads & writes. + */ + LockRelation(rel, ShareUpdateExclusiveLock); + + relfilenode = rel->rd_node.relNode; + GetHighestUsedAddressAndId(relfilenode, &currLogicalHigh, &highestId); + currSmgrHigh = logical_to_smgr(currLogicalHigh); + + resSmgrStart = next_block_start(currSmgrHigh); + resLogicalStart = smgr_to_logical(resSmgrStart); + + resLogicalEnd = resLogicalStart + sizeBytes - 1; + resSmgrEnd = logical_to_smgr(resLogicalEnd); + + RelationOpenSmgr(rel); + nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + + while (resSmgrEnd.blockno >= nblocks) + { + Buffer newBuffer = ReadBuffer(rel, P_NEW); + ReleaseBuffer(newBuffer); + nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); } - return highestUsedAddress; + RelationCloseSmgr(rel); + + stripe.fileOffset = resLogicalStart; + stripe.dataLength = sizeBytes; + stripe.blockCount = blockCount; + stripe.blockRowCount = blockRowCount; + stripe.columnCount = columnCount; + stripe.rowCount = rowCount; + stripe.id = highestId + 1; + + InsertStripeMetadataRow(relfilenode, &stripe); + + UnlockRelation(rel, ShareUpdateExclusiveLock); + + return stripe; } @@ -419,7 +513,7 @@ ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot) index = index_open(CStoreStripesIndexRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(cstoreStripes); - scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, NULL, 1, + scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, snapshot, 1, scanKey); while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) @@ -593,6 +687,7 @@ InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls) #if PG_VERSION_NUM >= 120000 TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor, &TTSOpsHeapTuple); + ExecStoreHeapTuple(tuple, slot, false); #else TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor); diff --git a/cstore_tableam.c b/cstore_tableam.c index 09a65d75b..ae7799410 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -120,8 +120,6 @@ ResetCStoreMemoryContext() static void cstore_init_write_state(Relation relation) { - /*TODO: upgrade lock to serialize writes */ - if (CStoreWriteState != NULL) { /* TODO: consider whether it's possible for a new write to start */ diff --git a/cstore_writer.c b/cstore_writer.c index 2c0ca541e..3be14994b 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -33,7 +33,7 @@ static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount, uint32 columnCount); -static StripeMetadata FlushStripe(TableWriteState *writeState); +static void FlushStripe(TableWriteState *writeState); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, bool datumTypeByValue, int datumTypeLength, @@ -45,8 +45,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode, int columnTypeLength, Oid columnCollation, FmgrInfo *comparisonFunction); static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); -static void AppendStripeMetadata(DataFileMetadata *datafileMetadata, - StripeMetadata stripeMetadata); static StringInfo CopyStringInfo(StringInfo sourceString); @@ -64,34 +62,12 @@ CStoreBeginWrite(Relation relation, TupleDesc tupleDescriptor) { TableWriteState *writeState = NULL; - DataFileMetadata *datafileMetadata = NULL; FmgrInfo **comparisonFunctionArray = NULL; MemoryContext stripeWriteContext = NULL; - uint64 currentFileOffset = 0; uint32 columnCount = 0; uint32 columnIndex = 0; bool *columnMaskArray = NULL; BlockData *blockData = NULL; - uint64 currentStripeId = 0; - Oid relNode = relation->rd_node.relNode; - - datafileMetadata = ReadDataFileMetadata(relNode, false); - - /* - * If stripeMetadataList is not empty, jump to the position right after - * the last position. - */ - if (datafileMetadata->stripeMetadataList != NIL) - { - StripeMetadata *lastStripe = NULL; - uint64 lastStripeSize = 0; - - lastStripe = llast(datafileMetadata->stripeMetadataList); - lastStripeSize += lastStripe->dataLength; - - currentFileOffset = lastStripe->fileOffset + lastStripeSize; - currentStripeId = lastStripe->id + 1; - } /* get comparison function pointers for each of the columns */ columnCount = tupleDescriptor->natts; @@ -129,19 +105,16 @@ CStoreBeginWrite(Relation relation, writeState = palloc0(sizeof(TableWriteState)); writeState->relation = relation; - writeState->datafileMetadata = datafileMetadata; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->blockRowCount = blockRowCount; writeState->tupleDescriptor = tupleDescriptor; - writeState->currentFileOffset = currentFileOffset; writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; writeState->stripeWriteContext = stripeWriteContext; writeState->blockData = blockData; writeState->compressionBuffer = NULL; - writeState->currentStripeId = currentStripeId; return writeState; } @@ -164,7 +137,6 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul StripeBuffers *stripeBuffers = writeState->stripeBuffers; StripeSkipList *stripeSkipList = writeState->stripeSkipList; uint32 columnCount = writeState->tupleDescriptor->natts; - DataFileMetadata *datafileMetadata = writeState->datafileMetadata; const uint32 blockRowCount = writeState->blockRowCount; BlockData *blockData = writeState->blockData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); @@ -238,28 +210,14 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul stripeBuffers->rowCount++; if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount) { - StripeMetadata stripeMetadata = FlushStripe(writeState); - MemoryContextReset(writeState->stripeWriteContext); - - writeState->currentStripeId++; + FlushStripe(writeState); /* set stripe data and skip list to NULL so they are recreated next time */ writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; + } - /* - * Append stripeMetadata in old context so next MemoryContextReset - * doesn't free it. - */ - MemoryContextSwitchTo(oldContext); - InsertStripeMetadataRow(writeState->relation->rd_node.relNode, - &stripeMetadata); - AppendStripeMetadata(datafileMetadata, stripeMetadata); - } - else - { - MemoryContextSwitchTo(oldContext); - } + MemoryContextSwitchTo(oldContext); } @@ -278,17 +236,13 @@ CStoreEndWrite(TableWriteState *writeState) { MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); - StripeMetadata stripeMetadata = FlushStripe(writeState); + FlushStripe(writeState); MemoryContextReset(writeState->stripeWriteContext); MemoryContextSwitchTo(oldContext); - InsertStripeMetadataRow(writeState->relation->rd_node.relNode, - &stripeMetadata); - AppendStripeMetadata(writeState->datafileMetadata, stripeMetadata); } MemoryContextDelete(writeState->stripeWriteContext); - list_free_deep(writeState->datafileMetadata->stripeMetadataList); pfree(writeState->comparisonFunctionArray); FreeBlockData(writeState->blockData); pfree(writeState); @@ -366,11 +320,9 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount, static void -WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) +WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength) { - uint64 logicalOffset = writeState->currentFileOffset; uint64 remaining = dataLength; - Relation rel = writeState->relation; Buffer buffer; while (remaining > 0) @@ -383,14 +335,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) RelationOpenSmgr(rel); nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - - while (addr.blockno >= nblocks) - { - Buffer newBuffer = ReadBuffer(rel, P_NEW); - ReleaseBuffer(newBuffer); - nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - } - + Assert(addr.blockno < nblocks); RelationCloseSmgr(rel); buffer = ReadBuffer(rel, addr.blockno); @@ -459,7 +404,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength) * the function creates the skip list and footer buffers. Finally, the function * flushes the skip list, data, and footer buffers to the file. */ -static StripeMetadata +static void FlushStripe(TableWriteState *writeState) { StripeMetadata stripeMetadata = { 0 }; @@ -474,8 +419,9 @@ FlushStripe(TableWriteState *writeState) uint32 blockRowCount = writeState->blockRowCount; uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount; uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount; - uint64 initialFileOffset = writeState->currentFileOffset; + uint64 currentFileOffset = 0; uint64 stripeSize = 0; + uint64 stripeRowCount = 0; /* * check if the last block needs serialization , the last block was not serialized @@ -520,6 +466,18 @@ FlushStripe(TableWriteState *writeState) } } + for (blockIndex = 0; blockIndex < blockCount; blockIndex++) + { + stripeRowCount += + stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount; + } + + stripeMetadata = ReserveStripe(writeState->relation, stripeSize, + stripeRowCount, columnCount, blockCount, + blockRowCount); + + currentFileOffset = stripeMetadata.fileOffset; + /* * Each stripe has only one section: * Data section, in which we store data for each column continuously. @@ -541,8 +499,9 @@ FlushStripe(TableWriteState *writeState) columnBuffers->blockBuffersArray[blockIndex]; StringInfo existsBuffer = blockBuffers->existsBuffer; - WriteToSmgr(writeState, existsBuffer->data, existsBuffer->len); - writeState->currentFileOffset += existsBuffer->len; + WriteToSmgr(writeState->relation, currentFileOffset, + existsBuffer->data, existsBuffer->len); + currentFileOffset += existsBuffer->len; } for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++) @@ -551,30 +510,16 @@ FlushStripe(TableWriteState *writeState) columnBuffers->blockBuffersArray[blockIndex]; StringInfo valueBuffer = blockBuffers->valueBuffer; - WriteToSmgr(writeState, valueBuffer->data, valueBuffer->len); - writeState->currentFileOffset += valueBuffer->len; + WriteToSmgr(writeState->relation, currentFileOffset, + valueBuffer->data, valueBuffer->len); + currentFileOffset += valueBuffer->len; } } /* create skip list and footer buffers */ SaveStripeSkipList(writeState->relation->rd_node.relNode, - writeState->currentStripeId, + stripeMetadata.id, stripeSkipList, tupleDescriptor); - - for (blockIndex = 0; blockIndex < blockCount; blockIndex++) - { - stripeMetadata.rowCount += - stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount; - } - - stripeMetadata.fileOffset = initialFileOffset; - stripeMetadata.dataLength = stripeSize; - stripeMetadata.id = writeState->currentStripeId; - stripeMetadata.blockCount = blockCount; - stripeMetadata.blockRowCount = writeState->blockRowCount; - stripeMetadata.columnCount = columnCount; - - return stripeMetadata; } @@ -797,21 +742,6 @@ DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength) } -/* - * AppendStripeMetadata adds a copy of given stripeMetadata to the given - * table footer's stripeMetadataList. - */ -static void -AppendStripeMetadata(DataFileMetadata *datafileMetadata, StripeMetadata stripeMetadata) -{ - StripeMetadata *stripeMetadataCopy = palloc0(sizeof(StripeMetadata)); - memcpy(stripeMetadataCopy, &stripeMetadata, sizeof(StripeMetadata)); - - datafileMetadata->stripeMetadataList = lappend(datafileMetadata->stripeMetadataList, - stripeMetadataCopy); -} - - /* * CopyStringInfo creates a deep copy of given source string allocating only needed * amount of memory. diff --git a/expected/am_vacuum.out b/expected/am_vacuum.out index 3db30a761..d1270a3d2 100644 --- a/expected/am_vacuum.out +++ b/expected/am_vacuum.out @@ -68,24 +68,24 @@ ALTER TABLE t DROP COLUMN a; SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3; stripe | attr | block | ?column? | ?column? --------+------+-------+----------+---------- - 0 | 1 | 0 | f | f - 0 | 2 | 0 | f | f 1 | 1 | 0 | f | f 1 | 2 | 0 | f | f 2 | 1 | 0 | f | f 2 | 2 | 0 | f | f + 3 | 1 | 0 | f | f + 3 | 2 | 0 | f | f (6 rows) VACUUM FULL t; SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3; stripe | attr | block | ?column? | ?column? --------+------+-------+----------+---------- - 0 | 1 | 0 | t | t - 0 | 2 | 0 | f | f 1 | 1 | 0 | t | t 1 | 2 | 0 | f | f 2 | 1 | 0 | t | t 2 | 2 | 0 | f | f + 3 | 1 | 0 | t | t + 3 | 2 | 0 | f | f (6 rows) -- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands @@ -107,14 +107,14 @@ SELECT count(*) FROM t; SELECT pg_size_pretty(pg_relation_size('t')); pg_size_pretty ---------------- - 16 kB + 32 kB (1 row) INSERT INTO t SELECT i FROM generate_series(1, 10000) i; SELECT pg_size_pretty(pg_relation_size('t')); pg_size_pretty ---------------- - 56 kB + 112 kB (1 row) SELECT count(*) FROM t; @@ -129,23 +129,23 @@ ROLLBACK TO SAVEPOINT s1; SELECT pg_size_pretty(pg_relation_size('t')); pg_size_pretty ---------------- - 56 kB + 112 kB (1 row) COMMIT; -- vacuum should truncate the relation to the usable space VACUUM VERBOSE t; INFO: statistics for "t": -total file size: 57344, total data size: 10754 +total file size: 114688, total data size: 10754 total row count: 2530, stripe count: 3, average rows per stripe: 843 block count: 3, containing data for dropped columns: 0, none compressed: 3, pglz compressed: 0 -INFO: "t": truncated 7 to 2 pages +INFO: "t": truncated 14 to 4 pages DETAIL: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s SELECT pg_size_pretty(pg_relation_size('t')); pg_size_pretty ---------------- - 16 kB + 32 kB (1 row) SELECT count(*) FROM t; @@ -172,7 +172,7 @@ INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i; COMMIT; VACUUM VERBOSE t; INFO: statistics for "t": -total file size: 24576, total data size: 18808 +total file size: 49152, total data size: 18808 total row count: 5530, stripe count: 5, average rows per stripe: 1106 block count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2 @@ -188,7 +188,7 @@ INSERT INTO t SELECT 1, i / 5 FROM generate_series(1, 1500) i; ALTER TABLE t DROP COLUMN c; VACUUM VERBOSE t; INFO: statistics for "t": -total file size: 32768, total data size: 31372 +total file size: 65536, total data size: 31372 total row count: 7030, stripe count: 6, average rows per stripe: 1171 block count: 11, containing data for dropped columns: 2, none compressed: 9, pglz compressed: 2 @@ -199,7 +199,7 @@ SET cstore.compression TO "pglz"; VACUUM FULL t; VACUUM VERBOSE t; INFO: statistics for "t": -total file size: 16384, total data size: 15728 +total file size: 49152, total data size: 15728 total row count: 7030, stripe count: 4, average rows per stripe: 1757 block count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6 diff --git a/expected/am_vacuum_vs_insert.out b/expected/am_vacuum_vs_insert.out index a3eb0fb89..d463bd076 100644 --- a/expected/am_vacuum_vs_insert.out +++ b/expected/am_vacuum_vs_insert.out @@ -11,7 +11,7 @@ step s1-insert: INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i; s2: INFO: statistics for "test_vacuum_vs_insert": -total file size: 8192, total data size: 26 +total file size: 24576, total data size: 26 total row count: 3, stripe count: 1, average rows per stripe: 3 block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0 @@ -51,7 +51,7 @@ step s1-commit: COMMIT; s2: INFO: vacuuming "public.test_vacuum_vs_insert" -s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 1 pages +s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versions in 3 pages DETAIL: 0 dead row versions cannot be removed yet. CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s. step s2-vacuum-full: <... completed> diff --git a/expected/am_write_concurrency.out b/expected/am_write_concurrency.out new file mode 100644 index 000000000..41c6ee7e6 --- /dev/null +++ b/expected/am_write_concurrency.out @@ -0,0 +1,142 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-insert s2-insert s1-select s2-select s1-commit s2-commit s1-select +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +1 2 +2 4 +3 6 +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +4 8 +5 10 +6 12 +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +1 2 +2 4 +3 6 +4 8 +5 10 +6 12 + +starting permutation: s1-begin s2-begin s1-copy s2-insert s1-select s2-select s1-commit s2-commit s1-select +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-copy: + COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13'; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +11 +12 +13 +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +4 8 +5 10 +6 12 +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +4 8 +5 10 +6 12 +11 +12 +13 + +starting permutation: s1-begin s2-begin s2-insert s1-copy s1-select s2-select s1-commit s2-commit s1-select +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s1-copy: + COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13'; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +11 +12 +13 +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +4 8 +5 10 +6 12 +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s1-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a b + +4 8 +5 10 +6 12 +11 +12 +13 diff --git a/specs/am_write_concurrency.spec b/specs/am_write_concurrency.spec new file mode 100644 index 000000000..7b5d90a4d --- /dev/null +++ b/specs/am_write_concurrency.spec @@ -0,0 +1,67 @@ +setup +{ + CREATE TABLE test_insert_concurrency (a int, b int) USING cstore_tableam; +} + +teardown +{ + DROP TABLE IF EXISTS test_insert_concurrency CASCADE; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-insert" +{ + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i; +} + +step "s1-copy" +{ + COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13'; +} + +step "s1-select" +{ + SELECT * FROM test_insert_concurrency ORDER BY a; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert" +{ + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; +} + +step "s2-select" +{ + SELECT * FROM test_insert_concurrency ORDER BY a; +} + +step "s2-commit" +{ + COMMIT; +} + +# writes shouldn't block writes or reads +permutation "s1-begin" "s2-begin" "s1-insert" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select" + +# copy vs insert +permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select" + +# insert vs copy +permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"