diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index dec19929b..2a3ab83aa 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -96,6 +96,8 @@ struct ColumnarReadState Snapshot snapshot; bool snapshotRegisteredByUs; + + ColumnarWriteState *cachedWriteState; }; /* static function declarations */ @@ -119,6 +121,12 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio List *whereClauseList, List *whereClauseVars, MemoryContext stripeReadContext, Snapshot snapshot); +static StripeReadState * BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel, + TupleDesc tupleDesc, List *projectedColumnList, + MemoryContext stripeReadContext); +static StripeReadState * makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, + MemoryContext stripeReadContext); static void AdvanceStripeRead(ColumnarReadState *readState); static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot); static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, @@ -440,6 +448,108 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, } +struct ColumnarWriteState +{ + TupleDesc tupleDescriptor; + FmgrInfo **comparisonFunctionArray; + RelFileNode relfilenode; + + MemoryContext stripeWriteContext; + MemoryContext perTupleContext; + StripeBuffers *stripeBuffers; + StripeSkipList *stripeSkipList; + EmptyStripeReservation *emptyStripeReservation; + ColumnarOptions options; + ChunkData *chunkData; + + List *chunkGroupRowCounts; + + /* + * compressionBuffer buffer is used as temporary storage during + * data value compression operation. It is kept here to minimize + * memory allocations. It lives in stripeWriteContext and gets + * deallocated when memory context is reset. + */ + StringInfo compressionBuffer; +}; + +void +ColumnarReadBufferByRowNumber(ColumnarReadState *readState, + uint64 rowNumber, Datum *columnValues, + bool *columnNulls) +{ + Relation columnarRelation = readState->relation; + + bool stripeCached = + readState->cachedWriteState && + rowNumber >= readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber && + rowNumber < readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber + + readState->cachedWriteState->stripeBuffers->rowCount; + Assert(!stripeCached || (readState->currentStripeMetadata && + readState->currentStripeMetadata->id == + readState->cachedWriteState->emptyStripeReservation->stripeId)); + + if (!stripeCached) + { + readState->currentStripeMetadata = FindStripeWithMatchingFirstRowNumber(columnarRelation, + rowNumber, + readState->snapshot); + if (!readState->currentStripeMetadata) + { + ereport(ERROR, (errmsg("not found 3"))); + } + + + readState->cachedWriteState = FindWriteStateByStripeId(columnarRelation->rd_node.relNode, + readState->currentStripeMetadata->id); + if (!readState->cachedWriteState) + { + ereport(ERROR, (errmsg("not found 4"))); + } + + TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation); + readState->stripeReadState = BeginBufferRead(readState->currentStripeMetadata, + columnarRelation, + relationTupleDesc, + readState->projectedColumnList, + readState->stripeReadContext); + } + + uint64 rowOffset = rowNumber - readState->currentStripeMetadata->firstRowNumber; + uint64 serializedRowCount = ColumnarWriteSerializedRowCount(readState->cachedWriteState); + if (rowOffset < serializedRowCount) + { + ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls); + } + else + { + ChunkData *chunkData = ColumnarWriteChunkData(readState->cachedWriteState); + uint64 chunkDataOffset = rowOffset - serializedRowCount + 1; + for (uint32 columnIndex = 0; columnIndex < chunkData->columnCount; columnIndex++) + { + if (chunkData->existsArray[columnIndex][chunkDataOffset]) + { + columnValues[columnIndex] = chunkData->valueArray[columnIndex][chunkDataOffset]; + columnNulls[columnIndex] = false; + } + else + { + columnNulls[columnIndex] = true; + } + } + } +} + + +Snapshot +ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot) +{ + Snapshot oldSnapshot = readState->snapshot; + readState->snapshot = snapshot; + return oldSnapshot; +} + + /* * ColumnarReadIsCurrentStripe returns true if stripe being read contains * row with given rowNumber. @@ -651,6 +761,55 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes { MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); + StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc, + projectedColumnList, + stripeReadContext); + + stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, + stripeMetadata, + tupleDesc, + projectedColumnList, + whereClauseList, + whereClauseVars, + &stripeReadState-> + chunkGroupsFiltered, + snapshot); + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; + + MemoryContextSwitchTo(oldContext); + + return stripeReadState; +} + + + +static StripeReadState * +BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, MemoryContext stripeReadContext) +{ + MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); + + StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc, + projectedColumnList, + stripeReadContext); + + stripeReadState->stripeBuffers = + ColumnarWriteStripeBuffers(FindWriteStateByStripeId(rel->rd_node.relNode, + stripeMetadata->id)); + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; + + MemoryContextSwitchTo(oldContext); + + return stripeReadState; +} + + + +static StripeReadState * +makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, + MemoryContext stripeReadContext) +{ StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState)); stripeReadState->relation = rel; @@ -660,21 +819,6 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes stripeReadState->projectedColumnList = projectedColumnList; stripeReadState->stripeReadContext = stripeReadContext; - stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, - stripeMetadata, - tupleDesc, - projectedColumnList, - whereClauseList, - whereClauseVars, - &stripeReadState-> - chunkGroupsFiltered, - snapshot); - - stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; - - MemoryContextSwitchTo(oldContext); - - return stripeReadState; } @@ -924,7 +1068,7 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowC chunkData->valueArray = palloc0(columnCount * sizeof(Datum *)); chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo)); chunkData->columnCount = columnCount; - chunkData->rowCount = chunkGroupRowCount; + chunkData->rowCount = 0; /* allocate chunk memory for deserialized data */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 98578612a..32423db69 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -545,26 +545,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, { if (stripeMetadata->insertedByCurrentXact) { - /* - * Stripe write is in progress and its entry is inserted by current - * transaction, so obviously it must be written by me. Since caller - * might want to use tupleslot datums for some reason, do another - * look-up, but this time by first flushing our writes. - * - * XXX: For index scan, this is the only case that we flush pending - * writes of the current backend. If we have taught reader how to - * read from WriteStateMap. then we could guarantee that - * index_fetch_tuple would never flush pending writes, but this seem - * to be too much work for now, but should be doable. - */ - ColumnarReadFlushPendingWrites(scan->cs_readState); + Snapshot oldSnapshot = ColumnarReadSetSnapshot(scan->cs_readState, SnapshotSelf); + ColumnarReadBufferByRowNumber(scan->cs_readState, rowNumber, + slot->tts_values, slot->tts_isnull); + ColumnarReadSetSnapshot(scan->cs_readState, oldSnapshot); - /* - * Fill the tupleslot and fall through to return true, it - * certainly exists. - */ - ColumnarReadRowByRowNumberOrError(scan->cs_readState, rowNumber, - slot->tts_values, slot->tts_isnull); + /* make sure to copy slot out of writer's memory context */ + slot->tts_ops->materialize(slot); } else { diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index 1db407b6b..99598d1cc 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -217,6 +217,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu char columnTypeAlign = attributeForm->attalign; chunkData->existsArray[columnIndex][chunkRowIndex] = true; + chunkData->valueArray[columnIndex][chunkRowIndex] = columnValues[columnIndex]; SerializeSingleDatum(chunkData->valueBufferArray[columnIndex], columnValues[columnIndex], columnTypeByValue, @@ -230,6 +231,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu chunkSkipNode->rowCount++; } + chunkData->rowCount++; + stripeSkipList->chunkCount = chunkIndex + 1; /* last row of the chunk is inserted serialize the chunk */ @@ -241,6 +244,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber + stripeBuffers->rowCount; stripeBuffers->rowCount++; + stripeBuffers->selectedChunkGroupRowCounts[chunkIndex]++; if (stripeBuffers->rowCount >= options->stripeRowCount) { ColumnarFlushPendingWrites(writeState); @@ -300,6 +304,33 @@ ColumnarWritePerTupleContext(ColumnarWriteState *state) } +uint64 +ColumnarWriteStripeId(ColumnarWriteState *writeState) +{ + return writeState->emptyStripeReservation->stripeId; +} + + +StripeBuffers * +ColumnarWriteStripeBuffers(ColumnarWriteState *writeState) +{ + return writeState->stripeBuffers; +} + + +uint64 +ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState) +{ + return writeState->stripeBuffers->rowCount - writeState->chunkData->rowCount; +} + + +ChunkData * +ColumnarWriteChunkData(ColumnarWriteState *writeState) +{ + return writeState->chunkData; +} + /* * CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given * column count. @@ -335,6 +366,9 @@ CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 chunkRowCount, stripeBuffers->columnCount = columnCount; stripeBuffers->rowCount = 0; + stripeBuffers->selectedChunkGroupRowCounts = + palloc0((1 + ((stripeMaxRowCount - 1) / chunkRowCount)) * sizeof(uint32)); + return stripeBuffers; } @@ -632,6 +666,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row /* valueBuffer needs to be reset for next chunk's data */ resetStringInfo(chunkData->valueBufferArray[columnIndex]); + chunkData->rowCount = 0; } } diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c index 201a1a479..c9af4e4ca 100644 --- a/src/backend/columnar/write_state_management.c +++ b/src/backend/columnar/write_state_management.c @@ -196,6 +196,35 @@ columnar_init_write_state(Relation relation, TupleDesc tupdesc, } + +ColumnarWriteState * +FindWriteStateByStripeId(Oid relNode, uint64 stripeId) +{ + bool found = false; + WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relNode, + HASH_FIND, &found); + if (!found) + { + ereport(ERROR, (errmsg("not found 1"))); + } + + SubXidWriteState *stackHead = hashEntry->writeStateStack; + while (stackHead) + { + ColumnarWriteState *writeState = stackHead->writeState; + if (stripeId != ColumnarWriteStripeId(writeState)) + { + stackHead = stackHead->next; + continue; + } + + return writeState; + } + + ereport(ERROR, (errmsg("not found 2"))); +} + + /* * Flushes pending writes for given relfilenode in the given subtransaction. */ diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index fff84503b..b35b1ce1a 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -228,6 +228,10 @@ extern void ColumnarFlushPendingWrites(ColumnarWriteState *state); extern void ColumnarEndWrite(ColumnarWriteState *state); extern bool ContainsPendingWrites(ColumnarWriteState *state); extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state); +extern uint64 ColumnarWriteStripeId(ColumnarWriteState *writeState); +extern StripeBuffers * ColumnarWriteStripeBuffers(ColumnarWriteState *writeState); +extern uint64 ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState); +extern ChunkData *ColumnarWriteChunkData(ColumnarWriteState *writeState); /* Function declarations for reading from columnar table */ @@ -256,6 +260,10 @@ extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState, extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, bool *columnNulls); +extern void ColumnarReadBufferByRowNumber(ColumnarReadState *readState, + uint64 rowNumber, Datum *columnValues, + bool *columnNulls); +extern Snapshot ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot); /* Function declarations for common functions */ extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, @@ -310,6 +318,7 @@ extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS); extern ColumnarWriteState * columnar_init_write_state(Relation relation, TupleDesc tupdesc, SubTransactionId currentSubXid); +extern ColumnarWriteState * FindWriteStateByStripeId(Oid relNode, uint64 stripeId); extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid); extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId