mirror of https://github.com/citusdata/citus.git
Handle aborted writes properly when scanning a columnar table (#5244)
If it is certain that we will not use any `parallel_worker`s for a columnar table, then stripe entries inserted by aborted transactions become visible to `SnapshotAny` and that causes `REINDEX` to fail by throwing a duplicate key error. To fix that: * consider three states for a stripe write operation: "flushed", "aborted", or "in-progress", * make sure to have a clear separation between them, and * act according to those three states when reading from a columnar tablepull/5245/head
parent
5dc619162d
commit
5825c44d5f
|
@ -51,6 +51,7 @@
|
|||
#include "port.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
@ -91,7 +92,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
|
|||
static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
|
||||
bool *update, Datum *newValues);
|
||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||
static StripeMetadata * BuildStripeMetadata(Datum *datumArray);
|
||||
static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
|
||||
HeapTuple heapTuple);
|
||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||
chunkGroupCount, Snapshot snapshot);
|
||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||
|
@ -701,19 +703,23 @@ FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
|
|||
|
||||
|
||||
/*
|
||||
* StripeIsFlushed returns true if stripe with stripeMetadata is flushed to
|
||||
* disk.
|
||||
* StripeWriteState returns write state of given stripe.
|
||||
*/
|
||||
bool
|
||||
StripeIsFlushed(StripeMetadata *stripeMetadata)
|
||||
StripeWriteStateEnum
|
||||
StripeWriteState(StripeMetadata *stripeMetadata)
|
||||
{
|
||||
/*
|
||||
* We insert dummy stripe metadata entry when inserting the first row.
|
||||
* For this reason, rowCount being equal to 0 cannot mean a valid stripe
|
||||
* with 0 rows but a stripe that is not flushed to disk, probably because
|
||||
* of an aborted xact.
|
||||
*/
|
||||
return stripeMetadata->rowCount > 0;
|
||||
if (stripeMetadata->aborted)
|
||||
{
|
||||
return STRIPE_WRITE_ABORTED;
|
||||
}
|
||||
else if (stripeMetadata->rowCount > 0)
|
||||
{
|
||||
return STRIPE_WRITE_FLUSHED;
|
||||
}
|
||||
else
|
||||
{
|
||||
return STRIPE_WRITE_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -783,13 +789,7 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
|||
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
foundStripeMetadata = BuildStripeMetadata(datumArray);
|
||||
CheckStripeMetadataConsistency(foundStripeMetadata);
|
||||
foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -801,30 +801,79 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
|||
|
||||
|
||||
/*
|
||||
* CheckStripeMetadataConsistency errors out if given StripeMetadata object
|
||||
* belongs to an un-flushed stripe but some fields of it contradicts with
|
||||
* this fact.
|
||||
* CheckStripeMetadataConsistency first decides if stripe write operation for
|
||||
* given stripe is "flushed", "aborted" or "in-progress", then errors out if
|
||||
* its metadata entry contradicts with this fact.
|
||||
*
|
||||
* Checks performed here are just to catch bugs, so it is encouraged to call
|
||||
* this function whenever a StripeMetadata object is built from an heap tuple
|
||||
* of columnar.stripe. Currently, BuildStripeMetadata is the only function
|
||||
* that does this.
|
||||
*/
|
||||
static void
|
||||
CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata)
|
||||
{
|
||||
if (StripeIsFlushed(stripeMetadata))
|
||||
{
|
||||
return;
|
||||
}
|
||||
bool stripeLooksInProgress =
|
||||
stripeMetadata->rowCount == 0 && stripeMetadata->chunkCount == 0 &&
|
||||
stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
|
||||
stripeMetadata->dataLength == 0;
|
||||
|
||||
if (stripeMetadata->rowCount > 0 || stripeMetadata->chunkCount > 0 ||
|
||||
stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset ||
|
||||
stripeMetadata->dataLength > 0)
|
||||
/*
|
||||
* Even if stripe is flushed, fileOffset and dataLength might be equal
|
||||
* to 0 for zero column tables, but those two should still be consistent
|
||||
* with respect to each other.
|
||||
*/
|
||||
bool stripeLooksFlushed =
|
||||
stripeMetadata->rowCount > 0 && stripeMetadata->chunkCount > 0 &&
|
||||
((stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset &&
|
||||
stripeMetadata->dataLength > 0) ||
|
||||
(stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
|
||||
stripeMetadata->dataLength == 0));
|
||||
|
||||
switch (StripeWriteState(stripeMetadata))
|
||||
{
|
||||
/*
|
||||
* If stripe was not flushed to disk, then values of given four
|
||||
* fields should match the columns inserted by
|
||||
* InsertEmptyStripeMetadataRow.
|
||||
*/
|
||||
ereport(ERROR, (errmsg("unexpected stripe state, stripe with id="
|
||||
UINT64_FORMAT " was not flushed properly",
|
||||
stripeMetadata->id)));
|
||||
case STRIPE_WRITE_FLUSHED:
|
||||
{
|
||||
/*
|
||||
* If stripe was flushed to disk, then we expect stripe to store
|
||||
* at least one tuple.
|
||||
*/
|
||||
if (stripeLooksFlushed)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case STRIPE_WRITE_IN_PROGRESS:
|
||||
{
|
||||
/*
|
||||
* If stripe was not flushed to disk, then values of given four
|
||||
* fields should match the columns inserted by
|
||||
* InsertEmptyStripeMetadataRow.
|
||||
*/
|
||||
if (stripeLooksInProgress)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case STRIPE_WRITE_ABORTED:
|
||||
{
|
||||
/*
|
||||
* Stripe metadata entry for an aborted write can be complete or
|
||||
* incomplete. We might have aborted the transaction before or after
|
||||
* inserting into stripe metadata.
|
||||
*/
|
||||
if (stripeLooksInProgress || stripeLooksFlushed)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
ereport(ERROR, (errmsg("unexpected stripe state, stripe metadata "
|
||||
"entry for stripe with id=" UINT64_FORMAT
|
||||
" is not consistent", stripeMetadata->id)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -853,12 +902,7 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
|
|||
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
stripeWithHighestRowNumber = BuildStripeMetadata(datumArray);
|
||||
stripeWithHighestRowNumber = BuildStripeMetadata(columnarStripes, heapTuple);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -1147,6 +1191,9 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
|
|||
|
||||
heap_inplace_update(columnarStripes, modifiedTuple);
|
||||
|
||||
StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes,
|
||||
modifiedTuple);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -1154,10 +1201,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
|
|||
table_close(columnarStripes, AccessShareLock);
|
||||
|
||||
/* return StripeMetadata object built from modified tuple */
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(modifiedTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
return BuildStripeMetadata(datumArray);
|
||||
return modifiedStripeMetadata;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1180,7 +1224,6 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
|||
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
|
||||
Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
|
||||
AccessShareLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
|
||||
snapshot, 1,
|
||||
|
@ -1189,11 +1232,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
|||
while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
|
||||
ForwardScanDirection)))
|
||||
{
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
StripeMetadata *stripeMetadata = BuildStripeMetadata(datumArray);
|
||||
StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
|
||||
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
|
||||
}
|
||||
|
||||
|
@ -1206,11 +1245,18 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
|||
|
||||
|
||||
/*
|
||||
* BuildStripeMetadata builds a StripeMetadata object from given datumArray.
|
||||
* BuildStripeMetadata builds a StripeMetadata object from given heap tuple.
|
||||
*/
|
||||
static StripeMetadata *
|
||||
BuildStripeMetadata(Datum *datumArray)
|
||||
BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
|
||||
{
|
||||
Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId());
|
||||
|
||||
Datum datumArray[Natts_columnar_stripe];
|
||||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(heapTuple, RelationGetDescr(columnarStripes),
|
||||
datumArray, isNullArray);
|
||||
|
||||
StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata));
|
||||
stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]);
|
||||
stripeMetadata->fileOffset = DatumGetInt64(
|
||||
|
@ -1227,6 +1273,12 @@ BuildStripeMetadata(Datum *datumArray)
|
|||
datumArray[Anum_columnar_stripe_row_count - 1]);
|
||||
stripeMetadata->firstRowNumber = DatumGetUInt64(
|
||||
datumArray[Anum_columnar_stripe_first_row_number - 1]);
|
||||
|
||||
TransactionId entryXmin = HeapTupleHeaderGetXmin(heapTuple->t_data);
|
||||
stripeMetadata->aborted = TransactionIdDidAbort(entryXmin);
|
||||
|
||||
CheckStripeMetadataConsistency(stripeMetadata);
|
||||
|
||||
return stripeMetadata;
|
||||
}
|
||||
|
||||
|
|
|
@ -288,7 +288,7 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!StripeIsFlushed(stripeMetadata))
|
||||
if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED)
|
||||
{
|
||||
/*
|
||||
* Callers are expected to skip stripes that are not flushed to
|
||||
|
@ -582,7 +582,7 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
|||
readState->snapshot);
|
||||
|
||||
if (readState->currentStripeMetadata &&
|
||||
!StripeIsFlushed(readState->currentStripeMetadata) &&
|
||||
StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED &&
|
||||
!SnapshotMightSeeUnflushedStripes(readState->snapshot))
|
||||
{
|
||||
/*
|
||||
|
@ -596,7 +596,7 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
|||
}
|
||||
|
||||
while (readState->currentStripeMetadata &&
|
||||
!StripeIsFlushed(readState->currentStripeMetadata))
|
||||
StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED)
|
||||
{
|
||||
readState->currentStripeMetadata =
|
||||
FindNextStripeByRowNumber(readState->relation,
|
||||
|
|
|
@ -562,7 +562,8 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (StripeIsFlushed(stripeMetadata) &&
|
||||
StripeWriteStateEnum stripeWriteState = StripeWriteState(stripeMetadata);
|
||||
if (stripeWriteState == STRIPE_WRITE_FLUSHED &&
|
||||
!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber,
|
||||
slot->tts_values, slot->tts_isnull))
|
||||
{
|
||||
|
@ -573,8 +574,7 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!StripeIsFlushed(stripeMetadata))
|
||||
else if (stripeWriteState == STRIPE_WRITE_ABORTED)
|
||||
{
|
||||
/*
|
||||
* We only expect to see un-flushed stripes when checking against
|
||||
|
@ -582,6 +582,12 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
* snapshot to index_fetch_tuple callback.
|
||||
*/
|
||||
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
|
||||
return false;
|
||||
}
|
||||
else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS)
|
||||
{
|
||||
/* similar to aborted writes .. */
|
||||
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
|
||||
|
||||
/*
|
||||
* Stripe that "might" contain the tuple with rowNumber is not
|
||||
|
@ -593,6 +599,14 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
*/
|
||||
memset(slot->tts_isnull, true, slot->tts_nvalid);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* At this point, we certainly know that stripe is flushed and
|
||||
* ColumnarReadRowByRowNumber successfully filled the tupleslot.
|
||||
*/
|
||||
Assert(stripeWriteState == STRIPE_WRITE_FLUSHED);
|
||||
}
|
||||
|
||||
slot->tts_tableOid = RelationGetRelid(columnarRelation);
|
||||
slot->tts_tid = *tid;
|
||||
|
|
|
@ -179,6 +179,27 @@ typedef struct StripeBuffers
|
|||
} StripeBuffers;
|
||||
|
||||
|
||||
/* return value of StripeWriteState to decide stripe write state */
|
||||
typedef enum StripeWriteStateEnum
|
||||
{
|
||||
/* stripe write is flushed to disk, so it's readable */
|
||||
STRIPE_WRITE_FLUSHED,
|
||||
|
||||
/*
|
||||
* Writer transaction did abort either before inserting into
|
||||
* columnar.stripe or after.
|
||||
*/
|
||||
STRIPE_WRITE_ABORTED,
|
||||
|
||||
/*
|
||||
* Writer transaction is still in-progress. Note that it is not certain
|
||||
* if it is being written by current backend's current transaction or
|
||||
* another backend.
|
||||
*/
|
||||
STRIPE_WRITE_IN_PROGRESS
|
||||
} StripeWriteStateEnum;
|
||||
|
||||
|
||||
/* ColumnarReadState represents state of a columnar scan. */
|
||||
struct ColumnarReadState;
|
||||
typedef struct ColumnarReadState ColumnarReadState;
|
||||
|
@ -268,7 +289,7 @@ extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumbe
|
|||
extern StripeMetadata * FindStripeWithMatchingFirstRowNumber(Relation relation,
|
||||
uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern bool StripeIsFlushed(StripeMetadata *stripeMetadata);
|
||||
extern StripeWriteStateEnum StripeWriteState(StripeMetadata *stripeMetadata);
|
||||
extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata);
|
||||
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||
Snapshot snapshot);
|
||||
|
|
|
@ -26,6 +26,9 @@ typedef struct StripeMetadata
|
|||
uint64 rowCount;
|
||||
uint64 id;
|
||||
uint64 firstRowNumber;
|
||||
|
||||
/* see StripeWriteState */
|
||||
bool aborted;
|
||||
} StripeMetadata;
|
||||
|
||||
/*
|
||||
|
|
|
@ -542,5 +542,13 @@ SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1';
|
|||
t
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE aborted_write_test (a INT PRIMARY KEY) USING columnar;
|
||||
ALTER TABLE aborted_write_test SET (parallel_workers = 0);
|
||||
INSERT INTO aborted_write_test VALUES (16999);
|
||||
INSERT INTO aborted_write_test VALUES (16999);
|
||||
ERROR: duplicate key value violates unique constraint "aborted_write_test_pkey"
|
||||
DETAIL: Key (a)=(16999) already exists.
|
||||
-- since second INSERT already failed, should not throw a "duplicate key" error
|
||||
REINDEX TABLE aborted_write_test;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA columnar_indexes CASCADE;
|
||||
|
|
|
@ -397,5 +397,14 @@ INSERT INTO revisit_same_cgroup SELECT random()*500, (random()*500)::INT::TEXT F
|
|||
|
||||
SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1';
|
||||
|
||||
CREATE TABLE aborted_write_test (a INT PRIMARY KEY) USING columnar;
|
||||
ALTER TABLE aborted_write_test SET (parallel_workers = 0);
|
||||
|
||||
INSERT INTO aborted_write_test VALUES (16999);
|
||||
INSERT INTO aborted_write_test VALUES (16999);
|
||||
|
||||
-- since second INSERT already failed, should not throw a "duplicate key" error
|
||||
REINDEX TABLE aborted_write_test;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA columnar_indexes CASCADE;
|
||||
|
|
Loading…
Reference in New Issue