From 5825c44d5f45ae78cd51484dfe1828f691eb82ea Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 8 Sep 2021 13:26:11 +0300 Subject: [PATCH] Handle aborted writes properly when scanning a columnar table (#5244) If it is certain that we will not use any `parallel_worker`s for a columnar table, then stripe entries inserted by aborted transactions become visible to `SnapshotAny` and that causes `REINDEX` to fail by throwing a duplicate key error. To fix that: * consider three states for a stripe write operation: "flushed", "aborted", or "in-progress", * make sure to have a clear separation between them, and * act according to those three states when reading from a columnar table --- src/backend/columnar/columnar_metadata.c | 162 ++++++++++++------ src/backend/columnar/columnar_reader.c | 6 +- src/backend/columnar/columnar_tableam.c | 20 ++- src/include/columnar/columnar.h | 23 ++- src/include/columnar/columnar_metadata.h | 3 + .../regress/expected/columnar_indexes.out | 8 + src/test/regress/sql/columnar_indexes.sql | 9 + 7 files changed, 169 insertions(+), 62 deletions(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 7b6425b76..6319ac3f0 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -51,6 +51,7 @@ #include "port.h" #include "storage/fd.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "storage/smgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -91,7 +92,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId, static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update, Datum *newValues); static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); -static StripeMetadata * BuildStripeMetadata(Datum *datumArray); +static StripeMetadata * BuildStripeMetadata(Relation columnarStripes, + HeapTuple heapTuple); static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount, Snapshot snapshot); static Oid ColumnarStorageIdSequenceRelationId(void); @@ -701,19 +703,23 @@ FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber, /* - * StripeIsFlushed returns true if stripe with stripeMetadata is flushed to - * disk. + * StripeWriteState returns write state of given stripe. */ -bool -StripeIsFlushed(StripeMetadata *stripeMetadata) +StripeWriteStateEnum +StripeWriteState(StripeMetadata *stripeMetadata) { - /* - * We insert dummy stripe metadata entry when inserting the first row. - * For this reason, rowCount being equal to 0 cannot mean a valid stripe - * with 0 rows but a stripe that is not flushed to disk, probably because - * of an aborted xact. - */ - return stripeMetadata->rowCount > 0; + if (stripeMetadata->aborted) + { + return STRIPE_WRITE_ABORTED; + } + else if (stripeMetadata->rowCount > 0) + { + return STRIPE_WRITE_FLUSHED; + } + else + { + return STRIPE_WRITE_IN_PROGRESS; + } } @@ -783,13 +789,7 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection); if (HeapTupleIsValid(heapTuple)) { - TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); - Datum datumArray[Natts_columnar_stripe]; - bool isNullArray[Natts_columnar_stripe]; - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - - foundStripeMetadata = BuildStripeMetadata(datumArray); - CheckStripeMetadataConsistency(foundStripeMetadata); + foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple); } systable_endscan_ordered(scanDescriptor); @@ -801,30 +801,79 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap /* - * CheckStripeMetadataConsistency errors out if given StripeMetadata object - * belongs to an un-flushed stripe but some fields of it contradicts with - * this fact. + * CheckStripeMetadataConsistency first decides if stripe write operation for + * given stripe is "flushed", "aborted" or "in-progress", then errors out if + * its metadata entry contradicts with this fact. + * + * Checks performed here are just to catch bugs, so it is encouraged to call + * this function whenever a StripeMetadata object is built from an heap tuple + * of columnar.stripe. Currently, BuildStripeMetadata is the only function + * that does this. */ static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata) { - if (StripeIsFlushed(stripeMetadata)) - { - return; - } + bool stripeLooksInProgress = + stripeMetadata->rowCount == 0 && stripeMetadata->chunkCount == 0 && + stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset && + stripeMetadata->dataLength == 0; - if (stripeMetadata->rowCount > 0 || stripeMetadata->chunkCount > 0 || - stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset || - stripeMetadata->dataLength > 0) + /* + * Even if stripe is flushed, fileOffset and dataLength might be equal + * to 0 for zero column tables, but those two should still be consistent + * with respect to each other. + */ + bool stripeLooksFlushed = + stripeMetadata->rowCount > 0 && stripeMetadata->chunkCount > 0 && + ((stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset && + stripeMetadata->dataLength > 0) || + (stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset && + stripeMetadata->dataLength == 0)); + + switch (StripeWriteState(stripeMetadata)) { - /* - * If stripe was not flushed to disk, then values of given four - * fields should match the columns inserted by - * InsertEmptyStripeMetadataRow. - */ - ereport(ERROR, (errmsg("unexpected stripe state, stripe with id=" - UINT64_FORMAT " was not flushed properly", - stripeMetadata->id))); + case STRIPE_WRITE_FLUSHED: + { + /* + * If stripe was flushed to disk, then we expect stripe to store + * at least one tuple. + */ + if (stripeLooksFlushed) + { + break; + } + } + + case STRIPE_WRITE_IN_PROGRESS: + { + /* + * If stripe was not flushed to disk, then values of given four + * fields should match the columns inserted by + * InsertEmptyStripeMetadataRow. + */ + if (stripeLooksInProgress) + { + break; + } + } + + case STRIPE_WRITE_ABORTED: + { + /* + * Stripe metadata entry for an aborted write can be complete or + * incomplete. We might have aborted the transaction before or after + * inserting into stripe metadata. + */ + if (stripeLooksInProgress || stripeLooksFlushed) + { + break; + } + } + + default: + ereport(ERROR, (errmsg("unexpected stripe state, stripe metadata " + "entry for stripe with id=" UINT64_FORMAT + " is not consistent", stripeMetadata->id))); } } @@ -853,12 +902,7 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot) HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection); if (HeapTupleIsValid(heapTuple)) { - TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); - Datum datumArray[Natts_columnar_stripe]; - bool isNullArray[Natts_columnar_stripe]; - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - - stripeWithHighestRowNumber = BuildStripeMetadata(datumArray); + stripeWithHighestRowNumber = BuildStripeMetadata(columnarStripes, heapTuple); } systable_endscan_ordered(scanDescriptor); @@ -1147,6 +1191,9 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update, heap_inplace_update(columnarStripes, modifiedTuple); + StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes, + modifiedTuple); + CommandCounterIncrement(); systable_endscan_ordered(scanDescriptor); @@ -1154,10 +1201,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update, table_close(columnarStripes, AccessShareLock); /* return StripeMetadata object built from modified tuple */ - Datum datumArray[Natts_columnar_stripe]; - bool isNullArray[Natts_columnar_stripe]; - heap_deform_tuple(modifiedTuple, tupleDescriptor, datumArray, isNullArray); - return BuildStripeMetadata(datumArray); + return modifiedStripeMetadata; } @@ -1180,7 +1224,6 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot) Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock); Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index, snapshot, 1, @@ -1189,11 +1232,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot) while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection))) { - Datum datumArray[Natts_columnar_stripe]; - bool isNullArray[Natts_columnar_stripe]; - - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - StripeMetadata *stripeMetadata = BuildStripeMetadata(datumArray); + StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple); stripeMetadataList = lappend(stripeMetadataList, stripeMetadata); } @@ -1206,11 +1245,18 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot) /* - * BuildStripeMetadata builds a StripeMetadata object from given datumArray. + * BuildStripeMetadata builds a StripeMetadata object from given heap tuple. */ static StripeMetadata * -BuildStripeMetadata(Datum *datumArray) +BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple) { + Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId()); + + Datum datumArray[Natts_columnar_stripe]; + bool isNullArray[Natts_columnar_stripe]; + heap_deform_tuple(heapTuple, RelationGetDescr(columnarStripes), + datumArray, isNullArray); + StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata)); stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]); stripeMetadata->fileOffset = DatumGetInt64( @@ -1227,6 +1273,12 @@ BuildStripeMetadata(Datum *datumArray) datumArray[Anum_columnar_stripe_row_count - 1]); stripeMetadata->firstRowNumber = DatumGetUInt64( datumArray[Anum_columnar_stripe_first_row_number - 1]); + + TransactionId entryXmin = HeapTupleHeaderGetXmin(heapTuple->t_data); + stripeMetadata->aborted = TransactionIdDidAbort(entryXmin); + + CheckStripeMetadataConsistency(stripeMetadata); + return stripeMetadata; } diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index c88dae8f3..cc72fd77b 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -288,7 +288,7 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, return false; } - if (!StripeIsFlushed(stripeMetadata)) + if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED) { /* * Callers are expected to skip stripes that are not flushed to @@ -582,7 +582,7 @@ AdvanceStripeRead(ColumnarReadState *readState) readState->snapshot); if (readState->currentStripeMetadata && - !StripeIsFlushed(readState->currentStripeMetadata) && + StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED && !SnapshotMightSeeUnflushedStripes(readState->snapshot)) { /* @@ -596,7 +596,7 @@ AdvanceStripeRead(ColumnarReadState *readState) } while (readState->currentStripeMetadata && - !StripeIsFlushed(readState->currentStripeMetadata)) + StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED) { readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index a57274b39..c360fad88 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -562,7 +562,8 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, return false; } - if (StripeIsFlushed(stripeMetadata) && + StripeWriteStateEnum stripeWriteState = StripeWriteState(stripeMetadata); + if (stripeWriteState == STRIPE_WRITE_FLUSHED && !ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values, slot->tts_isnull)) { @@ -573,8 +574,7 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, */ return false; } - - if (!StripeIsFlushed(stripeMetadata)) + else if (stripeWriteState == STRIPE_WRITE_ABORTED) { /* * We only expect to see un-flushed stripes when checking against @@ -582,6 +582,12 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, * snapshot to index_fetch_tuple callback. */ Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY); + return false; + } + else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS) + { + /* similar to aborted writes .. */ + Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY); /* * Stripe that "might" contain the tuple with rowNumber is not @@ -593,6 +599,14 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, */ memset(slot->tts_isnull, true, slot->tts_nvalid); } + else + { + /* + * At this point, we certainly know that stripe is flushed and + * ColumnarReadRowByRowNumber successfully filled the tupleslot. + */ + Assert(stripeWriteState == STRIPE_WRITE_FLUSHED); + } slot->tts_tableOid = RelationGetRelid(columnarRelation); slot->tts_tid = *tid; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index d439fd1fe..31bf6703b 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -179,6 +179,27 @@ typedef struct StripeBuffers } StripeBuffers; +/* return value of StripeWriteState to decide stripe write state */ +typedef enum StripeWriteStateEnum +{ + /* stripe write is flushed to disk, so it's readable */ + STRIPE_WRITE_FLUSHED, + + /* + * Writer transaction did abort either before inserting into + * columnar.stripe or after. + */ + STRIPE_WRITE_ABORTED, + + /* + * Writer transaction is still in-progress. Note that it is not certain + * if it is being written by current backend's current transaction or + * another backend. + */ + STRIPE_WRITE_IN_PROGRESS +} StripeWriteStateEnum; + + /* ColumnarReadState represents state of a columnar scan. */ struct ColumnarReadState; typedef struct ColumnarReadState ColumnarReadState; @@ -268,7 +289,7 @@ extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumbe extern StripeMetadata * FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot); -extern bool StripeIsFlushed(StripeMetadata *stripeMetadata); +extern StripeWriteStateEnum StripeWriteState(StripeMetadata *stripeMetadata); extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata); extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot); diff --git a/src/include/columnar/columnar_metadata.h b/src/include/columnar/columnar_metadata.h index b2d19ff73..07c951773 100644 --- a/src/include/columnar/columnar_metadata.h +++ b/src/include/columnar/columnar_metadata.h @@ -26,6 +26,9 @@ typedef struct StripeMetadata uint64 rowCount; uint64 id; uint64 firstRowNumber; + + /* see StripeWriteState */ + bool aborted; } StripeMetadata; /* diff --git a/src/test/regress/expected/columnar_indexes.out b/src/test/regress/expected/columnar_indexes.out index ac61c6c73..0026c3af8 100644 --- a/src/test/regress/expected/columnar_indexes.out +++ b/src/test/regress/expected/columnar_indexes.out @@ -542,5 +542,13 @@ SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1'; t (1 row) +CREATE TABLE aborted_write_test (a INT PRIMARY KEY) USING columnar; +ALTER TABLE aborted_write_test SET (parallel_workers = 0); +INSERT INTO aborted_write_test VALUES (16999); +INSERT INTO aborted_write_test VALUES (16999); +ERROR: duplicate key value violates unique constraint "aborted_write_test_pkey" +DETAIL: Key (a)=(16999) already exists. +-- since second INSERT already failed, should not throw a "duplicate key" error +REINDEX TABLE aborted_write_test; SET client_min_messages TO WARNING; DROP SCHEMA columnar_indexes CASCADE; diff --git a/src/test/regress/sql/columnar_indexes.sql b/src/test/regress/sql/columnar_indexes.sql index 5c27812a2..3a7445aba 100644 --- a/src/test/regress/sql/columnar_indexes.sql +++ b/src/test/regress/sql/columnar_indexes.sql @@ -397,5 +397,14 @@ INSERT INTO revisit_same_cgroup SELECT random()*500, (random()*500)::INT::TEXT F SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1'; +CREATE TABLE aborted_write_test (a INT PRIMARY KEY) USING columnar; +ALTER TABLE aborted_write_test SET (parallel_workers = 0); + +INSERT INTO aborted_write_test VALUES (16999); +INSERT INTO aborted_write_test VALUES (16999); + +-- since second INSERT already failed, should not throw a "duplicate key" error +REINDEX TABLE aborted_write_test; + SET client_min_messages TO WARNING; DROP SCHEMA columnar_indexes CASCADE;