diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 79c7946d2..936fe39a5 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -81,10 +81,14 @@ typedef enum RowNumberLookupMode FIND_GREATER } RowNumberLookupMode; -static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe); +static void InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, + uint32 columnCount, uint32 chunkGroupRowCount, + uint64 firstRowNumber); static void GetHighestUsedAddressAndId(uint64 storageId, uint64 *highestUsedAddress, uint64 *highestUsedId); +static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, + bool *update, Datum *newValues); static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); static StripeMetadata * BuildStripeMetadata(Datum *datumArray); static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 @@ -118,6 +122,7 @@ static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool ov static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot, RowNumberLookupMode lookupMode); +static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata); PG_FUNCTION_INFO_V1(columnar_relation_storageid); @@ -656,8 +661,7 @@ StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot) { StripeMetadata *stripeMetadata = - StripeMetadataLookupRowNumber(relation, rowNumber, - snapshot, FIND_LESS_OR_EQUAL); + FindStripeWithMatchingFirstRowNumber(relation, rowNumber, snapshot); if (!stripeMetadata) { return NULL; @@ -672,6 +676,46 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot) } +/* + * FindStripeWithMatchingFirstRowNumber returns a StripeMetadata object for + * the stripe that has the greatest firstRowNumber among the stripes whose + * firstRowNumber is smaller than or equal to given rowNumber. If no such + * stripe exists, then returns NULL. + * + * Note that this doesn't mean that found stripe certainly contains the tuple + * with given rowNumber. This is because, it also needs to be verified if + * highest row number that found stripe contains is greater than or equal to + * given rowNumber. For this reason, unless that additional check is done, + * this function is mostly useful for checking against "possible" constraint + * violations due to concurrent writes that are not flushed by other backends + * yet. + */ +StripeMetadata * +FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber, + Snapshot snapshot) +{ + return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot, + FIND_LESS_OR_EQUAL); +} + + +/* + * StripeIsFlushed returns true if stripe with stripeMetadata is flushed to + * disk. + */ +bool +StripeIsFlushed(StripeMetadata *stripeMetadata) +{ + /* + * We insert dummy stripe metadata entry when inserting the first row. + * For this reason, rowCount being equal to 0 cannot mean a valid stripe + * with 0 rows but a stripe that is not flushed to disk, probably because + * of an aborted xact. + */ + return stripeMetadata->rowCount > 0; +} + + /* * StripeGetHighestRowNumber returns rowNumber of the row with highest * rowNumber in given stripe. @@ -744,6 +788,7 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); foundStripeMetadata = BuildStripeMetadata(datumArray); + CheckStripeMetadataConsistency(foundStripeMetadata); } systable_endscan_ordered(scanDescriptor); @@ -754,6 +799,35 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap } +/* + * CheckStripeMetadataConsistency errors out if given StripeMetadata object + * belongs to an un-flushed stripe but some fields of it contradicts with + * this fact. + */ +static void +CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata) +{ + if (StripeIsFlushed(stripeMetadata)) + { + return; + } + + if (stripeMetadata->rowCount > 0 || stripeMetadata->chunkCount > 0 || + stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset || + stripeMetadata->dataLength > 0) + { + /* + * If stripe was not flushed to disk, then values of given four + * fields should match the columns inserted by + * InsertEmptyStripeMetadataRow. + */ + ereport(ERROR, (errmsg("unexpected stripe state, stripe with id=" + UINT64_FORMAT " was not flushed properly", + stripeMetadata->id))); + } +} + + /* * FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that * has the row with highest rowNumber by doing backward index scan on @@ -856,23 +930,36 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount, /* - * InsertStripeMetadataRow adds a row to columnar.stripe. + * InsertEmptyStripeMetadataRow adds a row to columnar.stripe for the empty + * stripe reservation made for stripeId. */ static void -InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe) +InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount, + uint32 chunkGroupRowCount, uint64 firstRowNumber) { - bool nulls[Natts_columnar_stripe] = { 0 }; - Datum values[Natts_columnar_stripe] = { - UInt64GetDatum(storageId), - Int64GetDatum(stripe->id), - Int64GetDatum(stripe->fileOffset), - Int64GetDatum(stripe->dataLength), - Int32GetDatum(stripe->columnCount), - Int32GetDatum(stripe->chunkGroupRowCount), - Int64GetDatum(stripe->rowCount), - Int32GetDatum(stripe->chunkCount), - UInt64GetDatum(stripe->firstRowNumber) - }; + bool nulls[Natts_columnar_stripe] = { false }; + + Datum values[Natts_columnar_stripe] = { 0 }; + values[Anum_columnar_stripe_storageid - 1] = + UInt64GetDatum(storageId); + values[Anum_columnar_stripe_stripe - 1] = + UInt64GetDatum(stripeId); + values[Anum_columnar_stripe_column_count - 1] = + UInt32GetDatum(columnCount); + values[Anum_columnar_stripe_chunk_row_count - 1] = + UInt32GetDatum(chunkGroupRowCount); + values[Anum_columnar_stripe_first_row_number - 1] = + UInt64GetDatum(firstRowNumber); + + /* stripe has no rows yet, so initialize rest of the columns accordingly */ + values[Anum_columnar_stripe_row_count - 1] = + UInt64GetDatum(0); + values[Anum_columnar_stripe_file_offset - 1] = + UInt64GetDatum(ColumnarInvalidLogicalOffset); + values[Anum_columnar_stripe_data_length - 1] = + UInt64GetDatum(0); + values[Anum_columnar_stripe_chunk_count - 1] = + UInt32GetDatum(0); Oid columnarStripesOid = ColumnarStripeRelationId(); Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock); @@ -953,35 +1040,123 @@ GetHighestUsedAddressAndId(uint64 storageId, /* - * ReserveStripe reserves and stripe of given size for the given relation, + * ReserveEmptyStripe reserves an empty stripe for given relation * and inserts it into columnar.stripe. It is guaranteed that concurrent * writes won't overwrite the returned stripe. */ -StripeMetadata -ReserveStripe(Relation rel, uint64 sizeBytes, - uint64 rowCount, uint64 columnCount, - uint64 chunkCount, uint64 chunkGroupRowCount, - uint64 stripeFirstRowNumber) +EmptyStripeReservation * +ReserveEmptyStripe(Relation rel, uint64 columnCount, uint64 chunkGroupRowCount, + uint64 stripeRowCount) { - StripeMetadata stripe = { 0 }; + EmptyStripeReservation *stripeReservation = palloc0(sizeof(EmptyStripeReservation)); uint64 storageId = ColumnarStorageGetStorageId(rel, false); - uint64 stripeId = ColumnarStorageReserveStripe(rel); + stripeReservation->stripeId = ColumnarStorageReserveStripeId(rel); + stripeReservation->stripeFirstRowNumber = + ColumnarStorageReserveRowNumber(rel, stripeRowCount); + + /* + * XXX: Instead of inserting a dummy entry to columnar.stripe and + * updating it when flushing the stripe, we could have a hash table + * in shared memory for the bookkeeping of ongoing writes. + */ + InsertEmptyStripeMetadataRow(storageId, stripeReservation->stripeId, + columnCount, chunkGroupRowCount, + stripeReservation->stripeFirstRowNumber); + + return stripeReservation; +} + + +/* + * CompleteStripeReservation completes reservation of the stripe with + * stripeId for given size and in-place updates related stripe metadata tuple + * to complete reservation. + */ +StripeMetadata * +CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes, + uint64 rowCount, uint64 chunkCount) +{ uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes); + uint64 storageId = ColumnarStorageGetStorageId(rel, false); - stripe.fileOffset = resLogicalStart; - stripe.dataLength = sizeBytes; - stripe.chunkCount = chunkCount; - stripe.chunkGroupRowCount = chunkGroupRowCount; - stripe.columnCount = columnCount; - stripe.rowCount = rowCount; - stripe.id = stripeId; - stripe.firstRowNumber = stripeFirstRowNumber; + bool update[Natts_columnar_stripe] = { false }; + update[Anum_columnar_stripe_file_offset - 1] = true; + update[Anum_columnar_stripe_data_length - 1] = true; + update[Anum_columnar_stripe_row_count - 1] = true; + update[Anum_columnar_stripe_chunk_count - 1] = true; - InsertStripeMetadataRow(storageId, &stripe); + Datum newValues[Natts_columnar_stripe] = { 0 }; + newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(resLogicalStart); + newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(sizeBytes); + newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount); + newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount); - return stripe; + return UpdateStripeMetadataRow(storageId, stripeId, update, newValues); +} + + +/* + * UpdateStripeMetadataRow updates stripe metadata tuple for the stripe with + * stripeId according to given newValues and update arrays. + * Note that this function shouldn't be used for the cases where any indexes + * of stripe metadata should be updated according to modifications done. + */ +static StripeMetadata * +UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update, + Datum *newValues) +{ + SnapshotData dirtySnapshot; + InitDirtySnapshot(dirtySnapshot); + + ScanKeyData scanKey[2]; + ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId)); + ScanKeyInit(&scanKey[1], Anum_columnar_stripe_stripe, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripeId)); + + Oid columnarStripesOid = ColumnarStripeRelationId(); + + Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock); + Relation columnarStripePkeyIndex = index_open(ColumnarStripePKeyIndexRelationId(), + AccessShareLock); + + SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, + columnarStripePkeyIndex, + &dirtySnapshot, 2, scanKey); + + HeapTuple oldTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection); + if (!HeapTupleIsValid(oldTuple)) + { + ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, " + "columnar storage with id=" UINT64_FORMAT + " does not have stripe with id=" UINT64_FORMAT, + storageId, stripeId))); + } + + /* + * heap_inplace_update already doesn't allow changing size of the original + * tuple, so we don't allow setting any Datum's to NULL values. + */ + bool newNulls[Natts_columnar_stripe] = { false }; + TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); + HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, tupleDescriptor, + newValues, newNulls, update); + + heap_inplace_update(columnarStripes, modifiedTuple); + + CommandCounterIncrement(); + + systable_endscan_ordered(scanDescriptor); + index_close(columnarStripePkeyIndex, AccessShareLock); + table_close(columnarStripes, AccessShareLock); + + /* return StripeMetadata object built from modified tuple */ + Datum datumArray[Natts_columnar_stripe]; + bool isNullArray[Natts_columnar_stripe]; + heap_deform_tuple(modifiedTuple, tupleDescriptor, datumArray, isNullArray); + return BuildStripeMetadata(datumArray); } diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 61762160b..28381804d 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -38,6 +38,10 @@ #include "columnar/columnar_tableam.h" #include "columnar/columnar_version_compat.h" +#define UNEXPECTED_STRIPE_READ_ERR_MSG \ + "attempted to read an unexpected stripe while reading columnar " \ + "table %s, stripe with id=" UINT64_FORMAT " is not flushed" + typedef struct ChunkGroupReadState { int64 currentRow; @@ -115,6 +119,7 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio MemoryContext stripeReadContext, Snapshot snapshot); static void AdvanceStripeRead(ColumnarReadState *readState); +static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot); static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, bool *columnNulls); static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int @@ -283,6 +288,18 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, return false; } + if (!StripeIsFlushed(stripeMetadata)) + { + /* + * Callers are expected to skip stripes that are not flushed to + * disk yet or should wait for the writer xact to commit or abort, + * but let's be on the safe side. + */ + ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG, + RelationGetRelationName(columnarRelation), + stripeMetadata->id))); + } + /* do the cleanup before reading a new stripe */ ColumnarResetRead(readState); @@ -562,6 +579,30 @@ AdvanceStripeRead(ColumnarReadState *readState) readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, lastReadRowNumber, readState->snapshot); + + if (readState->currentStripeMetadata && + !StripeIsFlushed(readState->currentStripeMetadata) && + !SnapshotMightSeeUnflushedStripes(readState->snapshot)) + { + /* + * To be on the safe side, error out if we don't expect to encounter + * with an un-flushed stripe. Otherwise, we will skip such stripes + * until finding a flushed one. + */ + ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG, + RelationGetRelationName(readState->relation), + readState->currentStripeMetadata->id))); + } + + while (readState->currentStripeMetadata && + !StripeIsFlushed(readState->currentStripeMetadata)) + { + readState->currentStripeMetadata = + FindNextStripeByRowNumber(readState->relation, + readState->currentStripeMetadata->firstRowNumber, + readState->snapshot); + } + readState->stripeReadState = NULL; MemoryContextReset(readState->stripeReadContext); @@ -569,6 +610,34 @@ AdvanceStripeRead(ColumnarReadState *readState) } +/* + * SnapshotMightSeeUnflushedStripes returns true if given snapshot is + * expected to see un-flushed stripes either because of other backends' + * pending writes or aborted transactions. + */ +static bool +SnapshotMightSeeUnflushedStripes(Snapshot snapshot) +{ + if (snapshot == InvalidSnapshot) + { + return false; + } + + switch (snapshot->snapshot_type) + { + case SNAPSHOT_ANY: + case SNAPSHOT_DIRTY: + case SNAPSHOT_NON_VACUUMABLE: + { + return true; + } + + default: + return false; + } +} + + /* * ReadStripeNextRow: If more rows can be read from the current stripe, fill * in non-NULL columnValues and return true. Otherwise, return false. diff --git a/src/backend/columnar/columnar_storage.c b/src/backend/columnar/columnar_storage.c index aba79303f..bc00877a2 100644 --- a/src/backend/columnar/columnar_storage.c +++ b/src/backend/columnar/columnar_storage.c @@ -371,15 +371,13 @@ ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows) /* - * ColumnarStorageReserveStripe returns stripeId and advances it for next + * ColumnarStorageReserveStripeId returns stripeId and advances it for next * stripeId reservation. * Note that this function doesn't handle row number reservation. - * This is because, unlike stripeId reservation, we immediately reserve - * row number during writes, not when flushing stripes to disk. * See ColumnarStorageReserveRowNumber function. */ uint64 -ColumnarStorageReserveStripe(Relation rel) +ColumnarStorageReserveStripeId(Relation rel) { LockRelationForExtension(rel, ExclusiveLock); diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 83e395fd5..36222fc48 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -547,12 +547,46 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, } uint64 rowNumber = tid_to_row_number(*tid); - if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values, - slot->tts_isnull)) + StripeMetadata *stripeMetadata = + FindStripeWithMatchingFirstRowNumber(columnarRelation, rowNumber, snapshot); + if (!stripeMetadata) { + /* it is certain that tuple with rowNumber doesn't exist */ return false; } + if (StripeIsFlushed(stripeMetadata) && + !ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, + slot->tts_values, slot->tts_isnull)) + { + /* + * FindStripeWithMatchingFirstRowNumber doesn't verify upper row + * number boundary of found stripe. For this reason, we didn't + * certainly know if given row number belongs to one of the stripes. + */ + return false; + } + + if (!StripeIsFlushed(stripeMetadata)) + { + /* + * We only expect to see un-flushed stripes when checking against + * constraint violation. In that case, indexAM provides dirty + * snapshot to index_fetch_tuple callback. + */ + Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY); + + /* + * Stripe that "might" contain the tuple with rowNumber is not + * flushed yet. Here we set all attributes of given tupleslot to NULL + * before returning true and expect the indexAM callback that called + * us --possibly to check against constraint violation-- blocks until + * writer transaction commits or aborts, without requiring us to fill + * the tupleslot properly. + */ + memset(slot->tts_isnull, true, slot->tts_nvalid); + } + slot->tts_tableOid = RelationGetRelid(columnarRelation); slot->tts_tid = *tid; ExecStoreVirtualTuple(slot); @@ -1393,7 +1427,8 @@ ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot) { StripeMetadata *stripeWithHighestRowNumber = FindStripeWithHighestRowNumber(relation, snapshot); - if (stripeWithHighestRowNumber == NULL) + if (stripeWithHighestRowNumber == NULL || + StripeGetHighestRowNumber(stripeWithHighestRowNumber) == 0) { /* table is empty according to our snapshot */ ItemPointerData invalidItemPtr; diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index cafef2381..1f2cb56e1 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -43,7 +43,7 @@ struct ColumnarWriteState MemoryContext perTupleContext; StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; - uint64 stripeFirstRowNumber; + EmptyStripeReservation *emptyStripeReservation; ColumnarOptions options; ChunkData *chunkData; @@ -130,7 +130,7 @@ ColumnarBeginWrite(RelFileNode relfilenode, writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; - writeState->stripeFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER; + writeState->emptyStripeReservation = NULL; writeState->stripeWriteContext = stripeWriteContext; writeState->chunkData = chunkData; writeState->compressionBuffer = NULL; @@ -177,9 +177,9 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode, writeState->relfilenode.relNode); Relation relation = relation_open(relationId, NoLock); - writeState->stripeFirstRowNumber = - ColumnarStorageReserveRowNumber(relation, - options->stripeRowCount); + writeState->emptyStripeReservation = + ReserveEmptyStripe(relation, columnCount, chunkRowCount, + options->stripeRowCount); relation_close(relation, NoLock); /* @@ -238,7 +238,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu SerializeChunkData(writeState, chunkIndex, chunkRowCount); } - uint64 writtenRowNumber = writeState->stripeFirstRowNumber + stripeBuffers->rowCount; + uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber + + stripeBuffers->rowCount; stripeBuffers->rowCount++; if (stripeBuffers->rowCount >= options->stripeRowCount) { @@ -376,7 +377,6 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 chunkRowCount, static void FlushStripe(ColumnarWriteState *writeState) { - StripeMetadata stripeMetadata = { 0 }; uint32 columnIndex = 0; uint32 chunkIndex = 0; StripeBuffers *stripeBuffers = writeState->stripeBuffers; @@ -442,11 +442,11 @@ FlushStripe(ColumnarWriteState *writeState) } } - stripeMetadata = ReserveStripe(relation, stripeSize, - stripeRowCount, columnCount, chunkCount, - chunkRowCount, writeState->stripeFirstRowNumber); + StripeMetadata *stripeMetadata = + CompleteStripeReservation(relation, writeState->emptyStripeReservation->stripeId, + stripeSize, stripeRowCount, chunkCount); - uint64 currentFileOffset = stripeMetadata.fileOffset; + uint64 currentFileOffset = stripeMetadata->fileOffset; /* * Each stripe has only one section: @@ -487,10 +487,10 @@ FlushStripe(ColumnarWriteState *writeState) } SaveChunkGroups(writeState->relfilenode, - stripeMetadata.id, + stripeMetadata->id, writeState->chunkGroupRowCounts); SaveStripeSkipList(writeState->relfilenode, - stripeMetadata.id, + stripeMetadata->id, stripeSkipList, tupleDescriptor); writeState->chunkGroupRowCounts = NIL; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 40cb81e70..799cd65ac 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -246,10 +246,12 @@ extern bool IsColumnarTableAmTable(Oid relationId); extern void DeleteMetadataRows(RelFileNode relfilenode); extern uint64 ColumnarMetadataNewStorageId(void); extern uint64 GetHighestUsedAddress(RelFileNode relfilenode); -extern StripeMetadata ReserveStripe(Relation rel, uint64 size, - uint64 rowCount, uint64 columnCount, - uint64 chunkCount, uint64 chunkGroupRowCount, - uint64 stripeFirstRowNumber); +extern EmptyStripeReservation * ReserveEmptyStripe(Relation rel, uint64 columnCount, + uint64 chunkGroupRowCount, + uint64 stripeRowCount); +extern StripeMetadata * CompleteStripeReservation(Relation rel, uint64 stripeId, + uint64 sizeBytes, uint64 rowCount, + uint64 chunkCount); extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor); @@ -263,6 +265,10 @@ extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowN Snapshot snapshot); extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot); +extern StripeMetadata * FindStripeWithMatchingFirstRowNumber(Relation relation, + uint64 rowNumber, + Snapshot snapshot); +extern bool StripeIsFlushed(StripeMetadata *stripeMetadata); extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata); extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot); diff --git a/src/include/columnar/columnar_metadata.h b/src/include/columnar/columnar_metadata.h index b3a9f1414..b2d19ff73 100644 --- a/src/include/columnar/columnar_metadata.h +++ b/src/include/columnar/columnar_metadata.h @@ -28,6 +28,16 @@ typedef struct StripeMetadata uint64 firstRowNumber; } StripeMetadata; +/* + * EmptyStripeReservation represents information for an empty stripe + * reservation. + */ +typedef struct EmptyStripeReservation +{ + uint64 stripeId; + uint64 stripeFirstRowNumber; +} EmptyStripeReservation; + extern List * StripesForRelfilenode(RelFileNode relfilenode); extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade); diff --git a/src/include/columnar/columnar_storage.h b/src/include/columnar/columnar_storage.h index a8ac01432..e81cbb7de 100644 --- a/src/include/columnar/columnar_storage.h +++ b/src/include/columnar/columnar_storage.h @@ -53,7 +53,7 @@ extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force); extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount); extern uint64 ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows); -extern uint64 ColumnarStorageReserveStripe(Relation rel); +extern uint64 ColumnarStorageReserveStripeId(Relation rel); extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset, char *data, uint32 amount); diff --git a/src/test/regress/columnar_isolation_schedule b/src/test/regress/columnar_isolation_schedule index 7183a1967..cfd5125c6 100644 --- a/src/test/regress/columnar_isolation_schedule +++ b/src/test/regress/columnar_isolation_schedule @@ -1,4 +1,4 @@ -test: columnar_write_concurrency +test: columnar_write_concurrency columnar_write_concurrency_index test: columnar_vacuum_vs_insert test: columnar_temp_tables test: columnar_index_concurrency diff --git a/src/test/regress/expected/columnar_rollback.out b/src/test/regress/expected/columnar_rollback.out index 1608b8298..2217dd788 100644 --- a/src/test/regress/expected/columnar_rollback.out +++ b/src/test/regress/expected/columnar_rollback.out @@ -19,7 +19,7 @@ select from columnar_test_helpers.columnar_storage_info('t'); version_major | version_minor | reserved_stripe_id | reserved_row_number --------------------------------------------------------------------- - 2 | 0 | 1 | 150001 + 2 | 0 | 2 | 150001 (1 row) -- check stripe metadata also have been rolled-back @@ -59,7 +59,7 @@ select from columnar_test_helpers.columnar_storage_info('t'); version_major | version_minor | reserved_stripe_id | reserved_row_number --------------------------------------------------------------------- - 2 | 0 | 3 | 600001 + 2 | 0 | 5 | 600001 (1 row) SELECT count(*) FROM t; @@ -89,7 +89,7 @@ select from columnar_test_helpers.columnar_storage_info('t'); version_major | version_minor | reserved_stripe_id | reserved_row_number --------------------------------------------------------------------- - 2 | 0 | 5 | 750001 + 2 | 0 | 6 | 750001 (1 row) SELECT count(*) FROM t; diff --git a/src/test/regress/expected/columnar_vacuum.out b/src/test/regress/expected/columnar_vacuum.out index 2ce1d0825..3850ce1da 100644 --- a/src/test/regress/expected/columnar_vacuum.out +++ b/src/test/regress/expected/columnar_vacuum.out @@ -244,7 +244,7 @@ select from columnar_test_helpers.columnar_storage_info('t'); version_major | version_minor | reserved_stripe_id | reserved_row_number --------------------------------------------------------------------- - 2 | 0 | 16 | 21001 + 2 | 0 | 18 | 21001 (1 row) SELECT * FROM columnar_test_helpers.chunk_group_consistency; diff --git a/src/test/regress/expected/columnar_write_concurrency.out b/src/test/regress/expected/columnar_write_concurrency.out index 88a4dc0e7..decb4a968 100644 --- a/src/test/regress/expected/columnar_write_concurrency.out +++ b/src/test/regress/expected/columnar_write_concurrency.out @@ -159,7 +159,7 @@ step s1-select: (6 rows) -starting permutation: s1-truncate s1-begin s1-insert-10000-rows s2-begin s2-insert s2-commit s1-commit s1-verify-metadata +starting permutation: s1-truncate s1-begin s1-insert-10000-rows s2-begin s2-insert s2-commit s1-commit step s1-truncate: TRUNCATE test_insert_concurrency; @@ -181,27 +181,6 @@ step s2-commit: step s1-commit: COMMIT; -step s1-verify-metadata: - WITH test_insert_concurrency_stripes AS ( - SELECT first_row_number, stripe_num, row_count - FROM columnar.stripe a, pg_class b - WHERE columnar_relation_storageid(b.oid)=a.storage_id AND - relname = 'test_insert_concurrency' - ) - SELECT - -- verify that table has two stripes .. - count(*) = 2 AND - -- .. and those stripes look like: - sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND - sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1 - AS stripe_metadata_for_test_insert_concurrency_ok - FROM test_insert_concurrency_stripes; - -stripe_metadata_for_test_insert_concurrency_ok ---------------------------------------------------------------------- -t -(1 row) - starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit step s1-begin: diff --git a/src/test/regress/expected/columnar_write_concurrency_index.out b/src/test/regress/expected/columnar_write_concurrency_index.out new file mode 100644 index 000000000..5f10b972e --- /dev/null +++ b/src/test/regress/expected/columnar_write_concurrency_index.out @@ -0,0 +1,610 @@ +Parsed test spec with 3 sessions + +starting permutation: s1-begin s1-insert-1 s2-copy-1 s1-commit s1-select-all +step s1-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-copy-1: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 4'; + +step s1-commit: + COMMIT; + +step s2-copy-1: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +3|1 +6|2 +(2 rows) + + +starting permutation: s1-begin s1-copy-1 s2-insert-1 s1-rollback s1-select-all +step s1-begin: + BEGIN; + +step s1-copy-1: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2'; + +step s2-insert-1: + INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s; + +step s1-rollback: + ROLLBACK; + +step s2-insert-1: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +2|1 +4|2 +6|3 +8|4 +(4 rows) + + +starting permutation: s1-begin s1-copy-1 s2-insert-2 s3-insert-1 s1-commit s1-select-all +step s1-begin: + BEGIN; + +step s1-copy-1: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2'; + +step s2-insert-2: + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s; + +step s3-insert-1: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s; + +step s1-commit: + COMMIT; + +step s2-insert-2: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + + a|b +--------------------------------------------------------------------- +21|3 +28|4 + |1 + |2 +(4 rows) + + +starting permutation: s1-begin s1-insert-1 s2-insert-2 s3-insert-1 s1-rollback s1-select-all +step s1-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-insert-2: + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s; + +step s3-insert-1: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s; + +step s1-rollback: + ROLLBACK; + +step s2-insert-2: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + + a|b +--------------------------------------------------------------------- +10|2 +21|3 +28|4 + 5|1 +(4 rows) + + +starting permutation: s1-begin s2-begin s1-insert-1 s2-insert-3 s3-insert-2 s1-commit s2-rollback s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-insert-3: + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(3,4) s; + +step s3-insert-2: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s; + +step s1-commit: + COMMIT; + +step s3-insert-2: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s2-rollback: + ROLLBACK; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +3|1 +6|2 +(2 rows) + + +starting permutation: s1-begin s2-begin s1-copy-1 s2-copy-2 s3-insert-2 s1-rollback s2-commit s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-copy-1: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2'; + +step s2-copy-2: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4'; + +step s3-insert-2: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s; + +step s1-rollback: + ROLLBACK; + +step s2-commit: + COMMIT; + +step s3-insert-2: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- + |3 + |4 +(2 rows) + + +starting permutation: s1-begin s2-begin s1-insert-1 s2-copy-2 s3-insert-2 s1-rollback s2-rollback s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-copy-2: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4'; + +step s3-insert-2: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s; + +step s1-rollback: + ROLLBACK; + +step s2-rollback: + ROLLBACK; + +step s3-insert-2: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + + a|b +--------------------------------------------------------------------- +14|2 +21|3 +(2 rows) + + +starting permutation: s1-begin s1-insert-2 s2-insert-4 s1-rollback s1-select-all +step s1-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s2-insert-4: + INSERT INTO write_concurrency_index SELECT s::text, 2*s FROM generate_series(1,4) s; + +step s1-rollback: + ROLLBACK; + +step s2-insert-4: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +1|2 +2|4 +3|6 +4|8 +(4 rows) + + +starting permutation: s1-begin s1-insert-2 s2-insert-5 s3-insert-3 s1-commit s1-select-all +step s1-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s2-insert-5: + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s; + +step s3-insert-3: + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s; + +step s1-commit: + COMMIT; + +step s2-insert-5: <... completed> +ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl" +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a| b +--------------------------------------------------------------------- +1| 3 +2| 6 +3|21 +4|28 +(4 rows) + + +starting permutation: s1-begin s2-begin s1-insert-2 s2-insert-6 s3-insert-4 s1-commit s2-rollback s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s2-insert-6: + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s; + +step s3-insert-4: + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s; + +step s1-commit: + COMMIT; + +step s3-insert-4: <... completed> +ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl" +step s2-rollback: + ROLLBACK; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +1|3 +2|6 +(2 rows) + + +starting permutation: s1-begin s2-begin s1-insert-2 s2-insert-6 s3-insert-4 s1-rollback s2-rollback s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s2-insert-6: + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s; + +step s3-insert-4: + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s; + +step s1-rollback: + ROLLBACK; + +step s2-rollback: + ROLLBACK; + +step s3-insert-4: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a| b +--------------------------------------------------------------------- +2|14 +3|21 +(2 rows) + + +starting permutation: s1-begin s1-insert-1 s2-index-select-all-b s1-rollback +step s1-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-index-select-all-b: + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; + +b +- +(0 rows) + +step s1-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s1-insert-1 s2-copy-2 s2-index-select-all-b s3-index-select-all-b s1-commit s2-index-select-all-b s2-rollback +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-copy-2: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4'; + +step s2-index-select-all-b: + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; + +b +- +3 +4 +(2 rows) + +step s3-index-select-all-b: + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; + +b +- +(0 rows) + +step s1-commit: + COMMIT; + +step s2-index-select-all-b: + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; + +b +- +1 +2 +3 +4 +(4 rows) + +step s2-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s1-insert-1 s1-select-all s2-insert-1 s1-commit s2-rollback +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +3|1 +6|2 +(2 rows) + +step s2-insert-1: + INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s; + +step s1-commit: + COMMIT; + +step s2-insert-1: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s2-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s1-insert-1 s1-select-all s2-insert-1 s1-rollback s2-rollback +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +3|1 +6|2 +(2 rows) + +step s2-insert-1: + INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s; + +step s1-rollback: + ROLLBACK; + +step s2-insert-1: <... completed> +step s2-rollback: + ROLLBACK; + + +starting permutation: s1-begin s1-copy-1 s1-select-all s2-insert-2 s3-insert-1 s1-rollback s1-select-all +step s1-begin: + BEGIN; + +step s1-copy-1: + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2'; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- + |1 + |2 +(2 rows) + +step s2-insert-2: + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s; + +step s3-insert-1: + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s; + +step s1-rollback: + ROLLBACK; + +step s2-insert-2: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + + a|b +--------------------------------------------------------------------- +10|2 +21|3 +28|4 + 5|1 +(4 rows) + + +starting permutation: s1-begin s1-insert-2 s1-select-all s2-insert-5 s3-insert-3 s1-commit s1-select-all +step s1-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +1|3 +2|6 +(2 rows) + +step s2-insert-5: + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s; + +step s3-insert-3: + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s; + +step s1-commit: + COMMIT; + +step s2-insert-5: <... completed> +ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl" +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a| b +--------------------------------------------------------------------- +1| 3 +2| 6 +3|21 +4|28 +(4 rows) + + +starting permutation: s1-begin s2-begin s1-insert-2 s1-select-all s2-insert-6 s3-insert-4 s1-rollback s2-rollback s1-select-all +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert-2: + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; + +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a|b +--------------------------------------------------------------------- +1|3 +2|6 +(2 rows) + +step s2-insert-6: + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s; + +step s3-insert-4: + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s; + +step s1-rollback: + ROLLBACK; + +step s2-rollback: + ROLLBACK; + +step s3-insert-4: <... completed> +step s1-select-all: + SELECT * FROM write_concurrency_index ORDER BY a,b; + +a| b +--------------------------------------------------------------------- +2|14 +3|21 +(2 rows) + + +starting permutation: s1-begin s2-begin-repeatable s1-insert-1 s2-insert-1 s1-commit s2-rollback +step s1-begin: + BEGIN; + +step s2-begin-repeatable: + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + +step s1-insert-1: + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; + +step s2-insert-1: + INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s; + +step s1-commit: + COMMIT; + +step s2-insert-1: <... completed> +ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key" +step s2-rollback: + ROLLBACK; + diff --git a/src/test/regress/spec/columnar_write_concurrency.spec b/src/test/regress/spec/columnar_write_concurrency.spec index 4b29c24a8..e9e07547d 100644 --- a/src/test/regress/spec/columnar_write_concurrency.spec +++ b/src/test/regress/spec/columnar_write_concurrency.spec @@ -44,24 +44,6 @@ step "s1-truncate" TRUNCATE test_insert_concurrency; } -step "s1-verify-metadata" -{ - WITH test_insert_concurrency_stripes AS ( - SELECT first_row_number, stripe_num, row_count - FROM columnar.stripe a, pg_class b - WHERE columnar_relation_storageid(b.oid)=a.storage_id AND - relname = 'test_insert_concurrency' - ) - SELECT - -- verify that table has two stripes .. - count(*) = 2 AND - -- .. and those stripes look like: - sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND - sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1 - AS stripe_metadata_for_test_insert_concurrency_ok - FROM test_insert_concurrency_stripes; -} - step "s1-commit" { COMMIT; @@ -103,10 +85,6 @@ permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select" // insert vs copy permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select" -# insert vs insert -# Start inserting rows in session 1, reserve first_row_number to be 1 for session 1 but commit session 2 before session 1. -# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has -# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times. -permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata" +permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit" diff --git a/src/test/regress/spec/columnar_write_concurrency_index.spec b/src/test/regress/spec/columnar_write_concurrency_index.spec new file mode 100644 index 000000000..4476faa53 --- /dev/null +++ b/src/test/regress/spec/columnar_write_concurrency_index.spec @@ -0,0 +1,174 @@ +setup +{ + CREATE TABLE write_concurrency_index (a text, b int unique, + EXCLUDE USING hash (a WITH =)) USING columnar; +} + +teardown +{ + DROP TABLE IF EXISTS write_concurrency_index CASCADE; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-rollback" +{ + ROLLBACK; +} + +step "s1-insert-1" +{ + INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s; +} + +step "s1-insert-2" +{ + INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s; +} + +step "s1-copy-1" +{ + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2'; +} + +step "s1-select-all" +{ + SELECT * FROM write_concurrency_index ORDER BY a,b; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-begin-repeatable" +{ + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +} + +step "s2-commit" +{ + COMMIT; +} + +step "s2-rollback" +{ + ROLLBACK; +} + +step "s2-insert-1" +{ + INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s; +} + +step "s2-insert-2" +{ + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s; +} + +step "s2-insert-3" +{ + INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(3,4) s; +} + +step "s2-insert-4" +{ + INSERT INTO write_concurrency_index SELECT s::text, 2*s FROM generate_series(1,4) s; +} + +step "s2-insert-5" +{ + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s; +} + +step "s2-insert-6" +{ + INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s; +} + +step "s2-copy-1" +{ + COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 4'; +} + +step "s2-copy-2" +{ + COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4'; +} + +step "s2-index-select-all-b" +{ + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; +} + +session "s3" + +step "s3-insert-1" +{ + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s; +} + +step "s3-insert-2" +{ + INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s; +} + +step "s3-insert-3" +{ + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s; +} + +step "s3-insert-4" +{ + INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s; +} + +step "s3-index-select-all-b" +{ + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SELECT b FROM write_concurrency_index ORDER BY 1; +} + +# unique (btree) on int column +permutation "s1-begin" "s1-insert-1" "s2-copy-1" "s1-commit" "s1-select-all" +permutation "s1-begin" "s1-copy-1" "s2-insert-1" "s1-rollback" "s1-select-all" +permutation "s1-begin" "s1-copy-1" "s2-insert-2" "s3-insert-1" "s1-commit" "s1-select-all" +permutation "s1-begin" "s1-insert-1" "s2-insert-2" "s3-insert-1" "s1-rollback" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-insert-3" "s3-insert-2" "s1-commit" "s2-rollback" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-copy-1" "s2-copy-2" "s3-insert-2" "s1-rollback" "s2-commit" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-copy-2" "s3-insert-2" "s1-rollback" "s2-rollback" "s1-select-all" + +# exclusion (hash) on text column that checks against duplicate values +permutation "s1-begin" "s1-insert-2" "s2-insert-4" "s1-rollback" "s1-select-all" +permutation "s1-begin" "s1-insert-2" "s2-insert-5" "s3-insert-3" "s1-commit" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-insert-2" "s2-insert-6" "s3-insert-4" "s1-commit" "s2-rollback" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-insert-2" "s2-insert-6" "s3-insert-4" "s1-rollback" "s2-rollback" "s1-select-all" + +# make sure that pending writes are not visible to other backends +permutation "s1-begin" "s1-insert-1" "s2-index-select-all-b" "s1-rollback" +permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-copy-2" "s2-index-select-all-b" "s3-index-select-all-b" "s1-commit" "s2-index-select-all-b" "s2-rollback" + +# force flushing write state of s1 before inserting some more data via other sessions +permutation "s1-begin" "s2-begin" "s1-insert-1" "s1-select-all" "s2-insert-1" "s1-commit" "s2-rollback" +permutation "s1-begin" "s2-begin" "s1-insert-1" "s1-select-all" "s2-insert-1" "s1-rollback" "s2-rollback" +permutation "s1-begin" "s1-copy-1" "s1-select-all" "s2-insert-2" "s3-insert-1" "s1-rollback" "s1-select-all" +permutation "s1-begin" "s1-insert-2" "s1-select-all" "s2-insert-5" "s3-insert-3" "s1-commit" "s1-select-all" +permutation "s1-begin" "s2-begin" "s1-insert-2" "s1-select-all" "s2-insert-6" "s3-insert-4" "s1-rollback" "s2-rollback" "s1-select-all" + +# test with repeatable read isolation mode +permutation "s1-begin" "s2-begin-repeatable" "s1-insert-1" "s2-insert-1" "s1-commit" "s2-rollback"