From decfa4bca2021600cdd9049d888f2efbf853d5bd Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 4 Oct 2021 16:23:52 +0300 Subject: [PATCH] Teach columnar index scan to read local write state, without flushing This removes the last case that we need to flush pending writes on master. Currently, when index scan encounters with a tid that belongs to an unflushed write of the current backend, then we first flush pending writes to be able to read the tuple via usual code-path. With the changes made with this commit, we don't flush pending writes of the current backend to read such a tuple. Instead, we directly read the tuple from local write state of the current backend. However, exclusion constraints always need to read the tuple during index scan to detect a constraint violation. This is not a problem on master before the changes made in this commit. This is because, on master, exclusion constraints always flush single tuple writes since postgres calls index_fetch_tuple for each tuple inserted. That way, concurrent writers can easily access unflushed tuples of other backends for constraint violation checks. For this reason, now that this commit removes that "flushing-on-read" logic from index scan, CI fails for some exclusion constraint tests. --- src/backend/columnar/columnar_reader.c | 176 ++++++++++++++++-- src/backend/columnar/columnar_tableam.c | 25 +-- src/backend/columnar/columnar_writer.c | 35 ++++ src/backend/columnar/write_state_management.c | 29 +++ src/include/columnar/columnar.h | 9 + 5 files changed, 239 insertions(+), 35 deletions(-) diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index dec19929b..2a3ab83aa 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -96,6 +96,8 @@ struct ColumnarReadState Snapshot snapshot; bool snapshotRegisteredByUs; + + ColumnarWriteState *cachedWriteState; }; /* static function declarations */ @@ -119,6 +121,12 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio List *whereClauseList, List *whereClauseVars, MemoryContext stripeReadContext, Snapshot snapshot); +static StripeReadState * BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel, + TupleDesc tupleDesc, List *projectedColumnList, + MemoryContext stripeReadContext); +static StripeReadState * makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, + MemoryContext stripeReadContext); static void AdvanceStripeRead(ColumnarReadState *readState); static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot); static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, @@ -440,6 +448,108 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, } +struct ColumnarWriteState +{ + TupleDesc tupleDescriptor; + FmgrInfo **comparisonFunctionArray; + RelFileNode relfilenode; + + MemoryContext stripeWriteContext; + MemoryContext perTupleContext; + StripeBuffers *stripeBuffers; + StripeSkipList *stripeSkipList; + EmptyStripeReservation *emptyStripeReservation; + ColumnarOptions options; + ChunkData *chunkData; + + List *chunkGroupRowCounts; + + /* + * compressionBuffer buffer is used as temporary storage during + * data value compression operation. It is kept here to minimize + * memory allocations. It lives in stripeWriteContext and gets + * deallocated when memory context is reset. + */ + StringInfo compressionBuffer; +}; + +void +ColumnarReadBufferByRowNumber(ColumnarReadState *readState, + uint64 rowNumber, Datum *columnValues, + bool *columnNulls) +{ + Relation columnarRelation = readState->relation; + + bool stripeCached = + readState->cachedWriteState && + rowNumber >= readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber && + rowNumber < readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber + + readState->cachedWriteState->stripeBuffers->rowCount; + Assert(!stripeCached || (readState->currentStripeMetadata && + readState->currentStripeMetadata->id == + readState->cachedWriteState->emptyStripeReservation->stripeId)); + + if (!stripeCached) + { + readState->currentStripeMetadata = FindStripeWithMatchingFirstRowNumber(columnarRelation, + rowNumber, + readState->snapshot); + if (!readState->currentStripeMetadata) + { + ereport(ERROR, (errmsg("not found 3"))); + } + + + readState->cachedWriteState = FindWriteStateByStripeId(columnarRelation->rd_node.relNode, + readState->currentStripeMetadata->id); + if (!readState->cachedWriteState) + { + ereport(ERROR, (errmsg("not found 4"))); + } + + TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation); + readState->stripeReadState = BeginBufferRead(readState->currentStripeMetadata, + columnarRelation, + relationTupleDesc, + readState->projectedColumnList, + readState->stripeReadContext); + } + + uint64 rowOffset = rowNumber - readState->currentStripeMetadata->firstRowNumber; + uint64 serializedRowCount = ColumnarWriteSerializedRowCount(readState->cachedWriteState); + if (rowOffset < serializedRowCount) + { + ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls); + } + else + { + ChunkData *chunkData = ColumnarWriteChunkData(readState->cachedWriteState); + uint64 chunkDataOffset = rowOffset - serializedRowCount + 1; + for (uint32 columnIndex = 0; columnIndex < chunkData->columnCount; columnIndex++) + { + if (chunkData->existsArray[columnIndex][chunkDataOffset]) + { + columnValues[columnIndex] = chunkData->valueArray[columnIndex][chunkDataOffset]; + columnNulls[columnIndex] = false; + } + else + { + columnNulls[columnIndex] = true; + } + } + } +} + + +Snapshot +ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot) +{ + Snapshot oldSnapshot = readState->snapshot; + readState->snapshot = snapshot; + return oldSnapshot; +} + + /* * ColumnarReadIsCurrentStripe returns true if stripe being read contains * row with given rowNumber. @@ -651,6 +761,55 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes { MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); + StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc, + projectedColumnList, + stripeReadContext); + + stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, + stripeMetadata, + tupleDesc, + projectedColumnList, + whereClauseList, + whereClauseVars, + &stripeReadState-> + chunkGroupsFiltered, + snapshot); + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; + + MemoryContextSwitchTo(oldContext); + + return stripeReadState; +} + + + +static StripeReadState * +BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, MemoryContext stripeReadContext) +{ + MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); + + StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc, + projectedColumnList, + stripeReadContext); + + stripeReadState->stripeBuffers = + ColumnarWriteStripeBuffers(FindWriteStateByStripeId(rel->rd_node.relNode, + stripeMetadata->id)); + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; + + MemoryContextSwitchTo(oldContext); + + return stripeReadState; +} + + + +static StripeReadState * +makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, + MemoryContext stripeReadContext) +{ StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState)); stripeReadState->relation = rel; @@ -660,21 +819,6 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes stripeReadState->projectedColumnList = projectedColumnList; stripeReadState->stripeReadContext = stripeReadContext; - stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, - stripeMetadata, - tupleDesc, - projectedColumnList, - whereClauseList, - whereClauseVars, - &stripeReadState-> - chunkGroupsFiltered, - snapshot); - - stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; - - MemoryContextSwitchTo(oldContext); - - return stripeReadState; } @@ -924,7 +1068,7 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowC chunkData->valueArray = palloc0(columnCount * sizeof(Datum *)); chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo)); chunkData->columnCount = columnCount; - chunkData->rowCount = chunkGroupRowCount; + chunkData->rowCount = 0; /* allocate chunk memory for deserialized data */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 98578612a..32423db69 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -545,26 +545,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, { if (stripeMetadata->insertedByCurrentXact) { - /* - * Stripe write is in progress and its entry is inserted by current - * transaction, so obviously it must be written by me. Since caller - * might want to use tupleslot datums for some reason, do another - * look-up, but this time by first flushing our writes. - * - * XXX: For index scan, this is the only case that we flush pending - * writes of the current backend. If we have taught reader how to - * read from WriteStateMap. then we could guarantee that - * index_fetch_tuple would never flush pending writes, but this seem - * to be too much work for now, but should be doable. - */ - ColumnarReadFlushPendingWrites(scan->cs_readState); + Snapshot oldSnapshot = ColumnarReadSetSnapshot(scan->cs_readState, SnapshotSelf); + ColumnarReadBufferByRowNumber(scan->cs_readState, rowNumber, + slot->tts_values, slot->tts_isnull); + ColumnarReadSetSnapshot(scan->cs_readState, oldSnapshot); - /* - * Fill the tupleslot and fall through to return true, it - * certainly exists. - */ - ColumnarReadRowByRowNumberOrError(scan->cs_readState, rowNumber, - slot->tts_values, slot->tts_isnull); + /* make sure to copy slot out of writer's memory context */ + slot->tts_ops->materialize(slot); } else { diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index 1db407b6b..99598d1cc 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -217,6 +217,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu char columnTypeAlign = attributeForm->attalign; chunkData->existsArray[columnIndex][chunkRowIndex] = true; + chunkData->valueArray[columnIndex][chunkRowIndex] = columnValues[columnIndex]; SerializeSingleDatum(chunkData->valueBufferArray[columnIndex], columnValues[columnIndex], columnTypeByValue, @@ -230,6 +231,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu chunkSkipNode->rowCount++; } + chunkData->rowCount++; + stripeSkipList->chunkCount = chunkIndex + 1; /* last row of the chunk is inserted serialize the chunk */ @@ -241,6 +244,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber + stripeBuffers->rowCount; stripeBuffers->rowCount++; + stripeBuffers->selectedChunkGroupRowCounts[chunkIndex]++; if (stripeBuffers->rowCount >= options->stripeRowCount) { ColumnarFlushPendingWrites(writeState); @@ -300,6 +304,33 @@ ColumnarWritePerTupleContext(ColumnarWriteState *state) } +uint64 +ColumnarWriteStripeId(ColumnarWriteState *writeState) +{ + return writeState->emptyStripeReservation->stripeId; +} + + +StripeBuffers * +ColumnarWriteStripeBuffers(ColumnarWriteState *writeState) +{ + return writeState->stripeBuffers; +} + + +uint64 +ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState) +{ + return writeState->stripeBuffers->rowCount - writeState->chunkData->rowCount; +} + + +ChunkData * +ColumnarWriteChunkData(ColumnarWriteState *writeState) +{ + return writeState->chunkData; +} + /* * CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given * column count. @@ -335,6 +366,9 @@ CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 chunkRowCount, stripeBuffers->columnCount = columnCount; stripeBuffers->rowCount = 0; + stripeBuffers->selectedChunkGroupRowCounts = + palloc0((1 + ((stripeMaxRowCount - 1) / chunkRowCount)) * sizeof(uint32)); + return stripeBuffers; } @@ -632,6 +666,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row /* valueBuffer needs to be reset for next chunk's data */ resetStringInfo(chunkData->valueBufferArray[columnIndex]); + chunkData->rowCount = 0; } } diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c index 201a1a479..c9af4e4ca 100644 --- a/src/backend/columnar/write_state_management.c +++ b/src/backend/columnar/write_state_management.c @@ -196,6 +196,35 @@ columnar_init_write_state(Relation relation, TupleDesc tupdesc, } + +ColumnarWriteState * +FindWriteStateByStripeId(Oid relNode, uint64 stripeId) +{ + bool found = false; + WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relNode, + HASH_FIND, &found); + if (!found) + { + ereport(ERROR, (errmsg("not found 1"))); + } + + SubXidWriteState *stackHead = hashEntry->writeStateStack; + while (stackHead) + { + ColumnarWriteState *writeState = stackHead->writeState; + if (stripeId != ColumnarWriteStripeId(writeState)) + { + stackHead = stackHead->next; + continue; + } + + return writeState; + } + + ereport(ERROR, (errmsg("not found 2"))); +} + + /* * Flushes pending writes for given relfilenode in the given subtransaction. */ diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index fff84503b..b35b1ce1a 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -228,6 +228,10 @@ extern void ColumnarFlushPendingWrites(ColumnarWriteState *state); extern void ColumnarEndWrite(ColumnarWriteState *state); extern bool ContainsPendingWrites(ColumnarWriteState *state); extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state); +extern uint64 ColumnarWriteStripeId(ColumnarWriteState *writeState); +extern StripeBuffers * ColumnarWriteStripeBuffers(ColumnarWriteState *writeState); +extern uint64 ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState); +extern ChunkData *ColumnarWriteChunkData(ColumnarWriteState *writeState); /* Function declarations for reading from columnar table */ @@ -256,6 +260,10 @@ extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState, extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, bool *columnNulls); +extern void ColumnarReadBufferByRowNumber(ColumnarReadState *readState, + uint64 rowNumber, Datum *columnValues, + bool *columnNulls); +extern Snapshot ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot); /* Function declarations for common functions */ extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, @@ -310,6 +318,7 @@ extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS); extern ColumnarWriteState * columnar_init_write_state(Relation relation, TupleDesc tupdesc, SubTransactionId currentSubXid); +extern ColumnarWriteState * FindWriteStateByStripeId(Oid relNode, uint64 stripeId); extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid); extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId