mirror of https://github.com/citusdata/citus.git
Split columnar stripe reservation into two phases (#5188)
Previously, we were doing `first_row_number` reservation for the first row written to current `WriteState` but were doing `stripe_id` reservation when flushing the `WriteState` and were inserting the related record to `columnar.stripe` at that time as well. However, inserting `columnar.stripe` record at flush-time is problematic. This is because, as told in #5160, if relation has any index-based constraints and if there are two concurrent writes that are inserting conflicting key values for that constraint, then postgres relies on `tableAM->fetch_index_tuple` (=`columnar_fetch_index_tuple`) callback to return `true` when indexAM is checking against possible constraint violations. However, pending writes of other backends are not visible to concurrent sessions in columnar since we were not inserting the stripe metadata record until flushing the stripe. With this commit, we split stripe reservation into two phases: i) Reserve `stripe_id` and insert a "dummy" record to `columnar.stripe` at the very same time we reserve `first_row_number`, i.e. when writing the first row to the current `WriteState`. ii) At flush time, do the storage level allocation and complete the missing fields of the dummy record inserted into `columnar.stripe` during i). That way, any concurrent writes would be able to check against possible constraint violations by using `SnapshotDirty` when scanning `columnar.stripe`. Note that `columnar_fetch_index_tuple` still wouldn't be able to fill the output tupleslot for the requested tid but it would at least return `true` for such index look-up's and we believe this should be sufficient for the caller indexAM callback to make the concurrent writer block on prior one. That is how we fix #5160. Only downside of reserving `stripe_id` at the same time we reserve `first_row_number` is that now any aborted writes would also waste some amount of `stripe_id` as in the case of `first_row_number` but we are just wasting them one-by-one. Considering the fact that we waste `first_row_number` by the amount stripe row limit (=150k by default) in such cases, this shouldn't be important at all.pull/5172/head
parent
0bf29200eb
commit
889a2731cb
|
@ -81,10 +81,14 @@ typedef enum RowNumberLookupMode
|
|||
FIND_GREATER
|
||||
} RowNumberLookupMode;
|
||||
|
||||
static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
|
||||
static void InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId,
|
||||
uint32 columnCount, uint32 chunkGroupRowCount,
|
||||
uint64 firstRowNumber);
|
||||
static void GetHighestUsedAddressAndId(uint64 storageId,
|
||||
uint64 *highestUsedAddress,
|
||||
uint64 *highestUsedId);
|
||||
static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
|
||||
bool *update, Datum *newValues);
|
||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||
static StripeMetadata * BuildStripeMetadata(Datum *datumArray);
|
||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||
|
@ -118,6 +122,7 @@ static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool ov
|
|||
static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot,
|
||||
RowNumberLookupMode lookupMode);
|
||||
static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata);
|
||||
|
||||
PG_FUNCTION_INFO_V1(columnar_relation_storageid);
|
||||
|
||||
|
@ -656,8 +661,7 @@ StripeMetadata *
|
|||
FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||
{
|
||||
StripeMetadata *stripeMetadata =
|
||||
StripeMetadataLookupRowNumber(relation, rowNumber,
|
||||
snapshot, FIND_LESS_OR_EQUAL);
|
||||
FindStripeWithMatchingFirstRowNumber(relation, rowNumber, snapshot);
|
||||
if (!stripeMetadata)
|
||||
{
|
||||
return NULL;
|
||||
|
@ -672,6 +676,46 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindStripeWithMatchingFirstRowNumber returns a StripeMetadata object for
|
||||
* the stripe that has the greatest firstRowNumber among the stripes whose
|
||||
* firstRowNumber is smaller than or equal to given rowNumber. If no such
|
||||
* stripe exists, then returns NULL.
|
||||
*
|
||||
* Note that this doesn't mean that found stripe certainly contains the tuple
|
||||
* with given rowNumber. This is because, it also needs to be verified if
|
||||
* highest row number that found stripe contains is greater than or equal to
|
||||
* given rowNumber. For this reason, unless that additional check is done,
|
||||
* this function is mostly useful for checking against "possible" constraint
|
||||
* violations due to concurrent writes that are not flushed by other backends
|
||||
* yet.
|
||||
*/
|
||||
StripeMetadata *
|
||||
FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot)
|
||||
{
|
||||
return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot,
|
||||
FIND_LESS_OR_EQUAL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StripeIsFlushed returns true if stripe with stripeMetadata is flushed to
|
||||
* disk.
|
||||
*/
|
||||
bool
|
||||
StripeIsFlushed(StripeMetadata *stripeMetadata)
|
||||
{
|
||||
/*
|
||||
* We insert dummy stripe metadata entry when inserting the first row.
|
||||
* For this reason, rowCount being equal to 0 cannot mean a valid stripe
|
||||
* with 0 rows but a stripe that is not flushed to disk, probably because
|
||||
* of an aborted xact.
|
||||
*/
|
||||
return stripeMetadata->rowCount > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StripeGetHighestRowNumber returns rowNumber of the row with highest
|
||||
* rowNumber in given stripe.
|
||||
|
@ -744,6 +788,7 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
|||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
foundStripeMetadata = BuildStripeMetadata(datumArray);
|
||||
CheckStripeMetadataConsistency(foundStripeMetadata);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -754,6 +799,35 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckStripeMetadataConsistency errors out if given StripeMetadata object
|
||||
* belongs to an un-flushed stripe but some fields of it contradicts with
|
||||
* this fact.
|
||||
*/
|
||||
static void
|
||||
CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata)
|
||||
{
|
||||
if (StripeIsFlushed(stripeMetadata))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (stripeMetadata->rowCount > 0 || stripeMetadata->chunkCount > 0 ||
|
||||
stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset ||
|
||||
stripeMetadata->dataLength > 0)
|
||||
{
|
||||
/*
|
||||
* If stripe was not flushed to disk, then values of given four
|
||||
* fields should match the columns inserted by
|
||||
* InsertEmptyStripeMetadataRow.
|
||||
*/
|
||||
ereport(ERROR, (errmsg("unexpected stripe state, stripe with id="
|
||||
UINT64_FORMAT " was not flushed properly",
|
||||
stripeMetadata->id)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that
|
||||
* has the row with highest rowNumber by doing backward index scan on
|
||||
|
@ -856,23 +930,36 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
|
|||
|
||||
|
||||
/*
|
||||
* InsertStripeMetadataRow adds a row to columnar.stripe.
|
||||
* InsertEmptyStripeMetadataRow adds a row to columnar.stripe for the empty
|
||||
* stripe reservation made for stripeId.
|
||||
*/
|
||||
static void
|
||||
InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
|
||||
InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount,
|
||||
uint32 chunkGroupRowCount, uint64 firstRowNumber)
|
||||
{
|
||||
bool nulls[Natts_columnar_stripe] = { 0 };
|
||||
Datum values[Natts_columnar_stripe] = {
|
||||
UInt64GetDatum(storageId),
|
||||
Int64GetDatum(stripe->id),
|
||||
Int64GetDatum(stripe->fileOffset),
|
||||
Int64GetDatum(stripe->dataLength),
|
||||
Int32GetDatum(stripe->columnCount),
|
||||
Int32GetDatum(stripe->chunkGroupRowCount),
|
||||
Int64GetDatum(stripe->rowCount),
|
||||
Int32GetDatum(stripe->chunkCount),
|
||||
UInt64GetDatum(stripe->firstRowNumber)
|
||||
};
|
||||
bool nulls[Natts_columnar_stripe] = { false };
|
||||
|
||||
Datum values[Natts_columnar_stripe] = { 0 };
|
||||
values[Anum_columnar_stripe_storageid - 1] =
|
||||
UInt64GetDatum(storageId);
|
||||
values[Anum_columnar_stripe_stripe - 1] =
|
||||
UInt64GetDatum(stripeId);
|
||||
values[Anum_columnar_stripe_column_count - 1] =
|
||||
UInt32GetDatum(columnCount);
|
||||
values[Anum_columnar_stripe_chunk_row_count - 1] =
|
||||
UInt32GetDatum(chunkGroupRowCount);
|
||||
values[Anum_columnar_stripe_first_row_number - 1] =
|
||||
UInt64GetDatum(firstRowNumber);
|
||||
|
||||
/* stripe has no rows yet, so initialize rest of the columns accordingly */
|
||||
values[Anum_columnar_stripe_row_count - 1] =
|
||||
UInt64GetDatum(0);
|
||||
values[Anum_columnar_stripe_file_offset - 1] =
|
||||
UInt64GetDatum(ColumnarInvalidLogicalOffset);
|
||||
values[Anum_columnar_stripe_data_length - 1] =
|
||||
UInt64GetDatum(0);
|
||||
values[Anum_columnar_stripe_chunk_count - 1] =
|
||||
UInt32GetDatum(0);
|
||||
|
||||
Oid columnarStripesOid = ColumnarStripeRelationId();
|
||||
Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock);
|
||||
|
@ -953,35 +1040,123 @@ GetHighestUsedAddressAndId(uint64 storageId,
|
|||
|
||||
|
||||
/*
|
||||
* ReserveStripe reserves and stripe of given size for the given relation,
|
||||
* ReserveEmptyStripe reserves an empty stripe for given relation
|
||||
* and inserts it into columnar.stripe. It is guaranteed that concurrent
|
||||
* writes won't overwrite the returned stripe.
|
||||
*/
|
||||
StripeMetadata
|
||||
ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 chunkCount, uint64 chunkGroupRowCount,
|
||||
uint64 stripeFirstRowNumber)
|
||||
EmptyStripeReservation *
|
||||
ReserveEmptyStripe(Relation rel, uint64 columnCount, uint64 chunkGroupRowCount,
|
||||
uint64 stripeRowCount)
|
||||
{
|
||||
StripeMetadata stripe = { 0 };
|
||||
EmptyStripeReservation *stripeReservation = palloc0(sizeof(EmptyStripeReservation));
|
||||
|
||||
uint64 storageId = ColumnarStorageGetStorageId(rel, false);
|
||||
|
||||
uint64 stripeId = ColumnarStorageReserveStripe(rel);
|
||||
stripeReservation->stripeId = ColumnarStorageReserveStripeId(rel);
|
||||
stripeReservation->stripeFirstRowNumber =
|
||||
ColumnarStorageReserveRowNumber(rel, stripeRowCount);
|
||||
|
||||
/*
|
||||
* XXX: Instead of inserting a dummy entry to columnar.stripe and
|
||||
* updating it when flushing the stripe, we could have a hash table
|
||||
* in shared memory for the bookkeeping of ongoing writes.
|
||||
*/
|
||||
InsertEmptyStripeMetadataRow(storageId, stripeReservation->stripeId,
|
||||
columnCount, chunkGroupRowCount,
|
||||
stripeReservation->stripeFirstRowNumber);
|
||||
|
||||
return stripeReservation;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompleteStripeReservation completes reservation of the stripe with
|
||||
* stripeId for given size and in-place updates related stripe metadata tuple
|
||||
* to complete reservation.
|
||||
*/
|
||||
StripeMetadata *
|
||||
CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes,
|
||||
uint64 rowCount, uint64 chunkCount)
|
||||
{
|
||||
uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
|
||||
uint64 storageId = ColumnarStorageGetStorageId(rel, false);
|
||||
|
||||
stripe.fileOffset = resLogicalStart;
|
||||
stripe.dataLength = sizeBytes;
|
||||
stripe.chunkCount = chunkCount;
|
||||
stripe.chunkGroupRowCount = chunkGroupRowCount;
|
||||
stripe.columnCount = columnCount;
|
||||
stripe.rowCount = rowCount;
|
||||
stripe.id = stripeId;
|
||||
stripe.firstRowNumber = stripeFirstRowNumber;
|
||||
bool update[Natts_columnar_stripe] = { false };
|
||||
update[Anum_columnar_stripe_file_offset - 1] = true;
|
||||
update[Anum_columnar_stripe_data_length - 1] = true;
|
||||
update[Anum_columnar_stripe_row_count - 1] = true;
|
||||
update[Anum_columnar_stripe_chunk_count - 1] = true;
|
||||
|
||||
InsertStripeMetadataRow(storageId, &stripe);
|
||||
Datum newValues[Natts_columnar_stripe] = { 0 };
|
||||
newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(resLogicalStart);
|
||||
newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(sizeBytes);
|
||||
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
|
||||
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
|
||||
|
||||
return stripe;
|
||||
return UpdateStripeMetadataRow(storageId, stripeId, update, newValues);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateStripeMetadataRow updates stripe metadata tuple for the stripe with
|
||||
* stripeId according to given newValues and update arrays.
|
||||
* Note that this function shouldn't be used for the cases where any indexes
|
||||
* of stripe metadata should be updated according to modifications done.
|
||||
*/
|
||||
static StripeMetadata *
|
||||
UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
|
||||
Datum *newValues)
|
||||
{
|
||||
SnapshotData dirtySnapshot;
|
||||
InitDirtySnapshot(dirtySnapshot);
|
||||
|
||||
ScanKeyData scanKey[2];
|
||||
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
|
||||
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripeId));
|
||||
|
||||
Oid columnarStripesOid = ColumnarStripeRelationId();
|
||||
|
||||
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
|
||||
Relation columnarStripePkeyIndex = index_open(ColumnarStripePKeyIndexRelationId(),
|
||||
AccessShareLock);
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes,
|
||||
columnarStripePkeyIndex,
|
||||
&dirtySnapshot, 2, scanKey);
|
||||
|
||||
HeapTuple oldTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
|
||||
if (!HeapTupleIsValid(oldTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
|
||||
"columnar storage with id=" UINT64_FORMAT
|
||||
" does not have stripe with id=" UINT64_FORMAT,
|
||||
storageId, stripeId)));
|
||||
}
|
||||
|
||||
/*
|
||||
* heap_inplace_update already doesn't allow changing size of the original
|
||||
* tuple, so we don't allow setting any Datum's to NULL values.
|
||||
*/
|
||||
bool newNulls[Natts_columnar_stripe] = { false };
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||
HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, tupleDescriptor,
|
||||
newValues, newNulls, update);
|
||||
|
||||
heap_inplace_update(columnarStripes, modifiedTuple);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(columnarStripePkeyIndex, AccessShareLock);
|
||||
table_close(columnarStripes, AccessShareLock);
|
||||
|
||||
/* return StripeMetadata object built from modified tuple */
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(modifiedTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
return BuildStripeMetadata(datumArray);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -38,6 +38,10 @@
|
|||
#include "columnar/columnar_tableam.h"
|
||||
#include "columnar/columnar_version_compat.h"
|
||||
|
||||
#define UNEXPECTED_STRIPE_READ_ERR_MSG \
|
||||
"attempted to read an unexpected stripe while reading columnar " \
|
||||
"table %s, stripe with id=" UINT64_FORMAT " is not flushed"
|
||||
|
||||
typedef struct ChunkGroupReadState
|
||||
{
|
||||
int64 currentRow;
|
||||
|
@ -115,6 +119,7 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio
|
|||
MemoryContext stripeReadContext,
|
||||
Snapshot snapshot);
|
||||
static void AdvanceStripeRead(ColumnarReadState *readState);
|
||||
static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot);
|
||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
|
||||
|
@ -283,6 +288,18 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!StripeIsFlushed(stripeMetadata))
|
||||
{
|
||||
/*
|
||||
* Callers are expected to skip stripes that are not flushed to
|
||||
* disk yet or should wait for the writer xact to commit or abort,
|
||||
* but let's be on the safe side.
|
||||
*/
|
||||
ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
|
||||
RelationGetRelationName(columnarRelation),
|
||||
stripeMetadata->id)));
|
||||
}
|
||||
|
||||
/* do the cleanup before reading a new stripe */
|
||||
ColumnarResetRead(readState);
|
||||
|
||||
|
@ -562,6 +579,30 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
|||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||
lastReadRowNumber,
|
||||
readState->snapshot);
|
||||
|
||||
if (readState->currentStripeMetadata &&
|
||||
!StripeIsFlushed(readState->currentStripeMetadata) &&
|
||||
!SnapshotMightSeeUnflushedStripes(readState->snapshot))
|
||||
{
|
||||
/*
|
||||
* To be on the safe side, error out if we don't expect to encounter
|
||||
* with an un-flushed stripe. Otherwise, we will skip such stripes
|
||||
* until finding a flushed one.
|
||||
*/
|
||||
ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
|
||||
RelationGetRelationName(readState->relation),
|
||||
readState->currentStripeMetadata->id)));
|
||||
}
|
||||
|
||||
while (readState->currentStripeMetadata &&
|
||||
!StripeIsFlushed(readState->currentStripeMetadata))
|
||||
{
|
||||
readState->currentStripeMetadata =
|
||||
FindNextStripeByRowNumber(readState->relation,
|
||||
readState->currentStripeMetadata->firstRowNumber,
|
||||
readState->snapshot);
|
||||
}
|
||||
|
||||
readState->stripeReadState = NULL;
|
||||
MemoryContextReset(readState->stripeReadContext);
|
||||
|
||||
|
@ -569,6 +610,34 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SnapshotMightSeeUnflushedStripes returns true if given snapshot is
|
||||
* expected to see un-flushed stripes either because of other backends'
|
||||
* pending writes or aborted transactions.
|
||||
*/
|
||||
static bool
|
||||
SnapshotMightSeeUnflushedStripes(Snapshot snapshot)
|
||||
{
|
||||
if (snapshot == InvalidSnapshot)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (snapshot->snapshot_type)
|
||||
{
|
||||
case SNAPSHOT_ANY:
|
||||
case SNAPSHOT_DIRTY:
|
||||
case SNAPSHOT_NON_VACUUMABLE:
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadStripeNextRow: If more rows can be read from the current stripe, fill
|
||||
* in non-NULL columnValues and return true. Otherwise, return false.
|
||||
|
|
|
@ -371,15 +371,13 @@ ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows)
|
|||
|
||||
|
||||
/*
|
||||
* ColumnarStorageReserveStripe returns stripeId and advances it for next
|
||||
* ColumnarStorageReserveStripeId returns stripeId and advances it for next
|
||||
* stripeId reservation.
|
||||
* Note that this function doesn't handle row number reservation.
|
||||
* This is because, unlike stripeId reservation, we immediately reserve
|
||||
* row number during writes, not when flushing stripes to disk.
|
||||
* See ColumnarStorageReserveRowNumber function.
|
||||
*/
|
||||
uint64
|
||||
ColumnarStorageReserveStripe(Relation rel)
|
||||
ColumnarStorageReserveStripeId(Relation rel)
|
||||
{
|
||||
LockRelationForExtension(rel, ExclusiveLock);
|
||||
|
||||
|
|
|
@ -547,12 +547,46 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
}
|
||||
|
||||
uint64 rowNumber = tid_to_row_number(*tid);
|
||||
if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values,
|
||||
slot->tts_isnull))
|
||||
StripeMetadata *stripeMetadata =
|
||||
FindStripeWithMatchingFirstRowNumber(columnarRelation, rowNumber, snapshot);
|
||||
if (!stripeMetadata)
|
||||
{
|
||||
/* it is certain that tuple with rowNumber doesn't exist */
|
||||
return false;
|
||||
}
|
||||
|
||||
if (StripeIsFlushed(stripeMetadata) &&
|
||||
!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber,
|
||||
slot->tts_values, slot->tts_isnull))
|
||||
{
|
||||
/*
|
||||
* FindStripeWithMatchingFirstRowNumber doesn't verify upper row
|
||||
* number boundary of found stripe. For this reason, we didn't
|
||||
* certainly know if given row number belongs to one of the stripes.
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!StripeIsFlushed(stripeMetadata))
|
||||
{
|
||||
/*
|
||||
* We only expect to see un-flushed stripes when checking against
|
||||
* constraint violation. In that case, indexAM provides dirty
|
||||
* snapshot to index_fetch_tuple callback.
|
||||
*/
|
||||
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
|
||||
|
||||
/*
|
||||
* Stripe that "might" contain the tuple with rowNumber is not
|
||||
* flushed yet. Here we set all attributes of given tupleslot to NULL
|
||||
* before returning true and expect the indexAM callback that called
|
||||
* us --possibly to check against constraint violation-- blocks until
|
||||
* writer transaction commits or aborts, without requiring us to fill
|
||||
* the tupleslot properly.
|
||||
*/
|
||||
memset(slot->tts_isnull, true, slot->tts_nvalid);
|
||||
}
|
||||
|
||||
slot->tts_tableOid = RelationGetRelid(columnarRelation);
|
||||
slot->tts_tid = *tid;
|
||||
ExecStoreVirtualTuple(slot);
|
||||
|
@ -1393,7 +1427,8 @@ ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot)
|
|||
{
|
||||
StripeMetadata *stripeWithHighestRowNumber =
|
||||
FindStripeWithHighestRowNumber(relation, snapshot);
|
||||
if (stripeWithHighestRowNumber == NULL)
|
||||
if (stripeWithHighestRowNumber == NULL ||
|
||||
StripeGetHighestRowNumber(stripeWithHighestRowNumber) == 0)
|
||||
{
|
||||
/* table is empty according to our snapshot */
|
||||
ItemPointerData invalidItemPtr;
|
||||
|
|
|
@ -43,7 +43,7 @@ struct ColumnarWriteState
|
|||
MemoryContext perTupleContext;
|
||||
StripeBuffers *stripeBuffers;
|
||||
StripeSkipList *stripeSkipList;
|
||||
uint64 stripeFirstRowNumber;
|
||||
EmptyStripeReservation *emptyStripeReservation;
|
||||
ColumnarOptions options;
|
||||
ChunkData *chunkData;
|
||||
|
||||
|
@ -130,7 +130,7 @@ ColumnarBeginWrite(RelFileNode relfilenode,
|
|||
writeState->comparisonFunctionArray = comparisonFunctionArray;
|
||||
writeState->stripeBuffers = NULL;
|
||||
writeState->stripeSkipList = NULL;
|
||||
writeState->stripeFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
|
||||
writeState->emptyStripeReservation = NULL;
|
||||
writeState->stripeWriteContext = stripeWriteContext;
|
||||
writeState->chunkData = chunkData;
|
||||
writeState->compressionBuffer = NULL;
|
||||
|
@ -177,9 +177,9 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode,
|
||||
writeState->relfilenode.relNode);
|
||||
Relation relation = relation_open(relationId, NoLock);
|
||||
writeState->stripeFirstRowNumber =
|
||||
ColumnarStorageReserveRowNumber(relation,
|
||||
options->stripeRowCount);
|
||||
writeState->emptyStripeReservation =
|
||||
ReserveEmptyStripe(relation, columnCount, chunkRowCount,
|
||||
options->stripeRowCount);
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
/*
|
||||
|
@ -238,7 +238,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
SerializeChunkData(writeState, chunkIndex, chunkRowCount);
|
||||
}
|
||||
|
||||
uint64 writtenRowNumber = writeState->stripeFirstRowNumber + stripeBuffers->rowCount;
|
||||
uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber +
|
||||
stripeBuffers->rowCount;
|
||||
stripeBuffers->rowCount++;
|
||||
if (stripeBuffers->rowCount >= options->stripeRowCount)
|
||||
{
|
||||
|
@ -376,7 +377,6 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 chunkRowCount,
|
|||
static void
|
||||
FlushStripe(ColumnarWriteState *writeState)
|
||||
{
|
||||
StripeMetadata stripeMetadata = { 0 };
|
||||
uint32 columnIndex = 0;
|
||||
uint32 chunkIndex = 0;
|
||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||
|
@ -442,11 +442,11 @@ FlushStripe(ColumnarWriteState *writeState)
|
|||
}
|
||||
}
|
||||
|
||||
stripeMetadata = ReserveStripe(relation, stripeSize,
|
||||
stripeRowCount, columnCount, chunkCount,
|
||||
chunkRowCount, writeState->stripeFirstRowNumber);
|
||||
StripeMetadata *stripeMetadata =
|
||||
CompleteStripeReservation(relation, writeState->emptyStripeReservation->stripeId,
|
||||
stripeSize, stripeRowCount, chunkCount);
|
||||
|
||||
uint64 currentFileOffset = stripeMetadata.fileOffset;
|
||||
uint64 currentFileOffset = stripeMetadata->fileOffset;
|
||||
|
||||
/*
|
||||
* Each stripe has only one section:
|
||||
|
@ -487,10 +487,10 @@ FlushStripe(ColumnarWriteState *writeState)
|
|||
}
|
||||
|
||||
SaveChunkGroups(writeState->relfilenode,
|
||||
stripeMetadata.id,
|
||||
stripeMetadata->id,
|
||||
writeState->chunkGroupRowCounts);
|
||||
SaveStripeSkipList(writeState->relfilenode,
|
||||
stripeMetadata.id,
|
||||
stripeMetadata->id,
|
||||
stripeSkipList, tupleDescriptor);
|
||||
|
||||
writeState->chunkGroupRowCounts = NIL;
|
||||
|
|
|
@ -246,10 +246,12 @@ extern bool IsColumnarTableAmTable(Oid relationId);
|
|||
extern void DeleteMetadataRows(RelFileNode relfilenode);
|
||||
extern uint64 ColumnarMetadataNewStorageId(void);
|
||||
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
|
||||
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 chunkCount, uint64 chunkGroupRowCount,
|
||||
uint64 stripeFirstRowNumber);
|
||||
extern EmptyStripeReservation * ReserveEmptyStripe(Relation rel, uint64 columnCount,
|
||||
uint64 chunkGroupRowCount,
|
||||
uint64 stripeRowCount);
|
||||
extern StripeMetadata * CompleteStripeReservation(Relation rel, uint64 stripeId,
|
||||
uint64 sizeBytes, uint64 rowCount,
|
||||
uint64 chunkCount);
|
||||
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
|
@ -263,6 +265,10 @@ extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowN
|
|||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindStripeWithMatchingFirstRowNumber(Relation relation,
|
||||
uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern bool StripeIsFlushed(StripeMetadata *stripeMetadata);
|
||||
extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata);
|
||||
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||
Snapshot snapshot);
|
||||
|
|
|
@ -28,6 +28,16 @@ typedef struct StripeMetadata
|
|||
uint64 firstRowNumber;
|
||||
} StripeMetadata;
|
||||
|
||||
/*
|
||||
* EmptyStripeReservation represents information for an empty stripe
|
||||
* reservation.
|
||||
*/
|
||||
typedef struct EmptyStripeReservation
|
||||
{
|
||||
uint64 stripeId;
|
||||
uint64 stripeFirstRowNumber;
|
||||
} EmptyStripeReservation;
|
||||
|
||||
extern List * StripesForRelfilenode(RelFileNode relfilenode);
|
||||
extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade);
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force);
|
|||
|
||||
extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount);
|
||||
extern uint64 ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows);
|
||||
extern uint64 ColumnarStorageReserveStripe(Relation rel);
|
||||
extern uint64 ColumnarStorageReserveStripeId(Relation rel);
|
||||
|
||||
extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset,
|
||||
char *data, uint32 amount);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
test: columnar_write_concurrency
|
||||
test: columnar_write_concurrency columnar_write_concurrency_index
|
||||
test: columnar_vacuum_vs_insert
|
||||
test: columnar_temp_tables
|
||||
test: columnar_index_concurrency
|
||||
|
|
|
@ -19,7 +19,7 @@ select
|
|||
from columnar_test_helpers.columnar_storage_info('t');
|
||||
version_major | version_minor | reserved_stripe_id | reserved_row_number
|
||||
---------------------------------------------------------------------
|
||||
2 | 0 | 1 | 150001
|
||||
2 | 0 | 2 | 150001
|
||||
(1 row)
|
||||
|
||||
-- check stripe metadata also have been rolled-back
|
||||
|
@ -59,7 +59,7 @@ select
|
|||
from columnar_test_helpers.columnar_storage_info('t');
|
||||
version_major | version_minor | reserved_stripe_id | reserved_row_number
|
||||
---------------------------------------------------------------------
|
||||
2 | 0 | 3 | 600001
|
||||
2 | 0 | 5 | 600001
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM t;
|
||||
|
@ -89,7 +89,7 @@ select
|
|||
from columnar_test_helpers.columnar_storage_info('t');
|
||||
version_major | version_minor | reserved_stripe_id | reserved_row_number
|
||||
---------------------------------------------------------------------
|
||||
2 | 0 | 5 | 750001
|
||||
2 | 0 | 6 | 750001
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM t;
|
||||
|
|
|
@ -244,7 +244,7 @@ select
|
|||
from columnar_test_helpers.columnar_storage_info('t');
|
||||
version_major | version_minor | reserved_stripe_id | reserved_row_number
|
||||
---------------------------------------------------------------------
|
||||
2 | 0 | 16 | 21001
|
||||
2 | 0 | 18 | 21001
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
||||
|
|
|
@ -159,7 +159,7 @@ step s1-select:
|
|||
(6 rows)
|
||||
|
||||
|
||||
starting permutation: s1-truncate s1-begin s1-insert-10000-rows s2-begin s2-insert s2-commit s1-commit s1-verify-metadata
|
||||
starting permutation: s1-truncate s1-begin s1-insert-10000-rows s2-begin s2-insert s2-commit s1-commit
|
||||
step s1-truncate:
|
||||
TRUNCATE test_insert_concurrency;
|
||||
|
||||
|
@ -181,27 +181,6 @@ step s2-commit:
|
|||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-verify-metadata:
|
||||
WITH test_insert_concurrency_stripes AS (
|
||||
SELECT first_row_number, stripe_num, row_count
|
||||
FROM columnar.stripe a, pg_class b
|
||||
WHERE columnar_relation_storageid(b.oid)=a.storage_id AND
|
||||
relname = 'test_insert_concurrency'
|
||||
)
|
||||
SELECT
|
||||
-- verify that table has two stripes ..
|
||||
count(*) = 2 AND
|
||||
-- .. and those stripes look like:
|
||||
sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND
|
||||
sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1
|
||||
AS stripe_metadata_for_test_insert_concurrency_ok
|
||||
FROM test_insert_concurrency_stripes;
|
||||
|
||||
stripe_metadata_for_test_insert_concurrency_ok
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit
|
||||
step s1-begin:
|
||||
|
|
|
@ -0,0 +1,610 @@
|
|||
Parsed test spec with 3 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert-1 s2-copy-1 s1-commit s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-copy-1:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 4';
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-copy-1: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
3|1
|
||||
6|2
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-copy-1 s2-insert-1 s1-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy-1:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2';
|
||||
|
||||
step s2-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-insert-1: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
2|1
|
||||
4|2
|
||||
6|3
|
||||
8|4
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-copy-1 s2-insert-2 s3-insert-1 s1-commit s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy-1:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2';
|
||||
|
||||
step s2-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s;
|
||||
<waiting ...>
|
||||
step s3-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s;
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-2: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
21|3
|
||||
28|4
|
||||
|1
|
||||
|2
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert-1 s2-insert-2 s3-insert-1 s1-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s;
|
||||
<waiting ...>
|
||||
step s3-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s;
|
||||
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-insert-2: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
10|2
|
||||
21|3
|
||||
28|4
|
||||
5|1
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-1 s2-insert-3 s3-insert-2 s1-commit s2-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-3:
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(3,4) s;
|
||||
|
||||
step s3-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s3-insert-2: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
3|1
|
||||
6|2
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-copy-1 s2-copy-2 s3-insert-2 s1-rollback s2-commit s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy-1:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2';
|
||||
|
||||
step s2-copy-2:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4';
|
||||
|
||||
step s3-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s3-insert-2: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
|3
|
||||
|4
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-1 s2-copy-2 s3-insert-2 s1-rollback s2-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-copy-2:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4';
|
||||
|
||||
step s3-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s3-insert-2: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
14|2
|
||||
21|3
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert-2 s2-insert-4 s1-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-4:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 2*s FROM generate_series(1,4) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-insert-4: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
1|2
|
||||
2|4
|
||||
3|6
|
||||
4|8
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert-2 s2-insert-5 s3-insert-3 s1-commit s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-5:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s;
|
||||
<waiting ...>
|
||||
step s3-insert-3:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s;
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-5: <... completed>
|
||||
ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl"
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a| b
|
||||
---------------------------------------------------------------------
|
||||
1| 3
|
||||
2| 6
|
||||
3|21
|
||||
4|28
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-2 s2-insert-6 s3-insert-4 s1-commit s2-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-6:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s;
|
||||
|
||||
step s3-insert-4:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s3-insert-4: <... completed>
|
||||
ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl"
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
1|3
|
||||
2|6
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-2 s2-insert-6 s3-insert-4 s1-rollback s2-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-6:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s;
|
||||
|
||||
step s3-insert-4:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s3-insert-4: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a| b
|
||||
---------------------------------------------------------------------
|
||||
2|14
|
||||
3|21
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert-1 s2-index-select-all-b s1-rollback
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-index-select-all-b:
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
|
||||
b
|
||||
-
|
||||
(0 rows)
|
||||
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-1 s2-copy-2 s2-index-select-all-b s3-index-select-all-b s1-commit s2-index-select-all-b s2-rollback
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-copy-2:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4';
|
||||
|
||||
step s2-index-select-all-b:
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
|
||||
b
|
||||
-
|
||||
3
|
||||
4
|
||||
(2 rows)
|
||||
|
||||
step s3-index-select-all-b:
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
|
||||
b
|
||||
-
|
||||
(0 rows)
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-index-select-all-b:
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
|
||||
b
|
||||
-
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
(4 rows)
|
||||
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-1 s1-select-all s2-insert-1 s1-commit s2-rollback
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
3|1
|
||||
6|2
|
||||
(2 rows)
|
||||
|
||||
step s2-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-1: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-1 s1-select-all s2-insert-1 s1-rollback s2-rollback
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
3|1
|
||||
6|2
|
||||
(2 rows)
|
||||
|
||||
step s2-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-insert-1: <... completed>
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-copy-1 s1-select-all s2-insert-2 s3-insert-1 s1-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy-1:
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2';
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
|1
|
||||
|2
|
||||
(2 rows)
|
||||
|
||||
step s2-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s;
|
||||
<waiting ...>
|
||||
step s3-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s;
|
||||
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-insert-2: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
10|2
|
||||
21|3
|
||||
28|4
|
||||
5|1
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert-2 s1-select-all s2-insert-5 s3-insert-3 s1-commit s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
1|3
|
||||
2|6
|
||||
(2 rows)
|
||||
|
||||
step s2-insert-5:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s;
|
||||
<waiting ...>
|
||||
step s3-insert-3:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s;
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-5: <... completed>
|
||||
ERROR: conflicting key value violates exclusion constraint "write_concurrency_index_a_excl"
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a| b
|
||||
---------------------------------------------------------------------
|
||||
1| 3
|
||||
2| 6
|
||||
3|21
|
||||
4|28
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert-2 s1-select-all s2-insert-6 s3-insert-4 s1-rollback s2-rollback s1-select-all
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert-2:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
1|3
|
||||
2|6
|
||||
(2 rows)
|
||||
|
||||
step s2-insert-6:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s;
|
||||
|
||||
step s3-insert-4:
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s;
|
||||
<waiting ...>
|
||||
step s1-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
step s3-insert-4: <... completed>
|
||||
step s1-select-all:
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
|
||||
a| b
|
||||
---------------------------------------------------------------------
|
||||
2|14
|
||||
3|21
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s2-begin-repeatable s1-insert-1 s2-insert-1 s1-commit s2-rollback
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin-repeatable:
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
|
||||
step s1-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
|
||||
step s2-insert-1:
|
||||
INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-1: <... completed>
|
||||
ERROR: duplicate key value violates unique constraint "write_concurrency_index_b_key"
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
|
@ -44,24 +44,6 @@ step "s1-truncate"
|
|||
TRUNCATE test_insert_concurrency;
|
||||
}
|
||||
|
||||
step "s1-verify-metadata"
|
||||
{
|
||||
WITH test_insert_concurrency_stripes AS (
|
||||
SELECT first_row_number, stripe_num, row_count
|
||||
FROM columnar.stripe a, pg_class b
|
||||
WHERE columnar_relation_storageid(b.oid)=a.storage_id AND
|
||||
relname = 'test_insert_concurrency'
|
||||
)
|
||||
SELECT
|
||||
-- verify that table has two stripes ..
|
||||
count(*) = 2 AND
|
||||
-- .. and those stripes look like:
|
||||
sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND
|
||||
sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1
|
||||
AS stripe_metadata_for_test_insert_concurrency_ok
|
||||
FROM test_insert_concurrency_stripes;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
|
@ -103,10 +85,6 @@ permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select"
|
|||
// insert vs copy
|
||||
permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
||||
|
||||
# insert vs insert
|
||||
# Start inserting rows in session 1, reserve first_row_number to be 1 for session 1 but commit session 2 before session 1.
|
||||
# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has
|
||||
# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times.
|
||||
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata"
|
||||
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit"
|
||||
|
||||
permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit"
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
setup
|
||||
{
|
||||
CREATE TABLE write_concurrency_index (a text, b int unique,
|
||||
EXCLUDE USING hash (a WITH =)) USING columnar;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS write_concurrency_index CASCADE;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s1-rollback"
|
||||
{
|
||||
ROLLBACK;
|
||||
}
|
||||
|
||||
step "s1-insert-1"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (3*s)::text, s FROM generate_series(1,2) s;
|
||||
}
|
||||
|
||||
step "s1-insert-2"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 3*s FROM generate_series(1,2) s;
|
||||
}
|
||||
|
||||
step "s1-copy-1"
|
||||
{
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 2';
|
||||
}
|
||||
|
||||
step "s1-select-all"
|
||||
{
|
||||
SELECT * FROM write_concurrency_index ORDER BY a,b;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-begin-repeatable"
|
||||
{
|
||||
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s2-rollback"
|
||||
{
|
||||
ROLLBACK;
|
||||
}
|
||||
|
||||
step "s2-insert-1"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (2*s)::text, s FROM generate_series(1,4) s;
|
||||
}
|
||||
|
||||
step "s2-insert-2"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(1,2) s;
|
||||
}
|
||||
|
||||
step "s2-insert-3"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (5*s)::text, s FROM generate_series(3,4) s;
|
||||
}
|
||||
|
||||
step "s2-insert-4"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 2*s FROM generate_series(1,4) s;
|
||||
}
|
||||
|
||||
step "s2-insert-5"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(1,2) s;
|
||||
}
|
||||
|
||||
step "s2-insert-6"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 5*s FROM generate_series(3,4) s;
|
||||
}
|
||||
|
||||
step "s2-copy-1"
|
||||
{
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 1 4';
|
||||
}
|
||||
|
||||
step "s2-copy-2"
|
||||
{
|
||||
COPY write_concurrency_index(b) FROM PROGRAM 'seq 3 4';
|
||||
}
|
||||
|
||||
step "s2-index-select-all-b"
|
||||
{
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
}
|
||||
|
||||
session "s3"
|
||||
|
||||
step "s3-insert-1"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(3,4) s;
|
||||
}
|
||||
|
||||
step "s3-insert-2"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT (7*s)::text, s FROM generate_series(2,3) s;
|
||||
}
|
||||
|
||||
step "s3-insert-3"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(3,4) s;
|
||||
}
|
||||
|
||||
step "s3-insert-4"
|
||||
{
|
||||
INSERT INTO write_concurrency_index SELECT s::text, 7*s FROM generate_series(2,3) s;
|
||||
}
|
||||
|
||||
step "s3-index-select-all-b"
|
||||
{
|
||||
SET enable_seqscan TO OFF;
|
||||
SET columnar.enable_custom_scan TO OFF;
|
||||
SELECT b FROM write_concurrency_index ORDER BY 1;
|
||||
}
|
||||
|
||||
# unique (btree) on int column
|
||||
permutation "s1-begin" "s1-insert-1" "s2-copy-1" "s1-commit" "s1-select-all"
|
||||
permutation "s1-begin" "s1-copy-1" "s2-insert-1" "s1-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s1-copy-1" "s2-insert-2" "s3-insert-1" "s1-commit" "s1-select-all"
|
||||
permutation "s1-begin" "s1-insert-1" "s2-insert-2" "s3-insert-1" "s1-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-insert-3" "s3-insert-2" "s1-commit" "s2-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-copy-1" "s2-copy-2" "s3-insert-2" "s1-rollback" "s2-commit" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-copy-2" "s3-insert-2" "s1-rollback" "s2-rollback" "s1-select-all"
|
||||
|
||||
# exclusion (hash) on text column that checks against duplicate values
|
||||
permutation "s1-begin" "s1-insert-2" "s2-insert-4" "s1-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s1-insert-2" "s2-insert-5" "s3-insert-3" "s1-commit" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-2" "s2-insert-6" "s3-insert-4" "s1-commit" "s2-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-2" "s2-insert-6" "s3-insert-4" "s1-rollback" "s2-rollback" "s1-select-all"
|
||||
|
||||
# make sure that pending writes are not visible to other backends
|
||||
permutation "s1-begin" "s1-insert-1" "s2-index-select-all-b" "s1-rollback"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-1" "s2-copy-2" "s2-index-select-all-b" "s3-index-select-all-b" "s1-commit" "s2-index-select-all-b" "s2-rollback"
|
||||
|
||||
# force flushing write state of s1 before inserting some more data via other sessions
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-1" "s1-select-all" "s2-insert-1" "s1-commit" "s2-rollback"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-1" "s1-select-all" "s2-insert-1" "s1-rollback" "s2-rollback"
|
||||
permutation "s1-begin" "s1-copy-1" "s1-select-all" "s2-insert-2" "s3-insert-1" "s1-rollback" "s1-select-all"
|
||||
permutation "s1-begin" "s1-insert-2" "s1-select-all" "s2-insert-5" "s3-insert-3" "s1-commit" "s1-select-all"
|
||||
permutation "s1-begin" "s2-begin" "s1-insert-2" "s1-select-all" "s2-insert-6" "s3-insert-4" "s1-rollback" "s2-rollback" "s1-select-all"
|
||||
|
||||
# test with repeatable read isolation mode
|
||||
permutation "s1-begin" "s2-begin-repeatable" "s1-insert-1" "s2-insert-1" "s1-commit" "s2-rollback"
|
Loading…
Reference in New Issue