diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 1150dc1ac..9d0dc2658 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -88,7 +88,7 @@ static void GetHighestUsedAddressAndId(uint64 storageId, static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); static StripeMetadata * BuildStripeMetadata(Datum *datumArray); static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 - chunkGroupCount); + chunkGroupCount, Snapshot snapshot); static Oid ColumnarStorageIdSequenceRelationId(void); static Oid ColumnarStripeRelationId(void); static Oid ColumnarStripePKeyIndexRelationId(void); @@ -532,7 +532,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, */ StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor, - uint32 chunkCount) + uint32 chunkCount, Snapshot snapshot) { int32 columnIndex = 0; HeapTuple heapTuple = NULL; @@ -550,8 +550,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); - SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, NULL, - 2, scanKey); + SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, + snapshot, 2, scanKey); StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList)); chunkList->chunkCount = chunkCount; @@ -634,7 +634,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri table_close(columnarChunk, AccessShareLock); chunkList->chunkGroupRowCounts = - ReadChunkGroupRowCounts(storageId, stripe, chunkCount); + ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot); return chunkList; } @@ -803,7 +803,8 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot) * given stripe. */ static uint32 * -ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount) +ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount, + Snapshot snapshot) { Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId(); Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock); @@ -816,7 +817,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount) BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); SysScanDesc scanDescriptor = - systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey); + systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey); uint32 chunkGroupIndex = 0; HeapTuple heapTuple = NULL; diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 96efdbffc..87b4131ce 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -88,6 +88,8 @@ struct ColumnarReadState * itself. */ MemoryContext scanContext; + + Snapshot snapshot; }; /* static function declarations */ @@ -109,7 +111,8 @@ static bool HasUnreadStripe(ColumnarReadState *readState); static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - MemoryContext stripeReadContext); + MemoryContext stripeReadContext, + Snapshot snapshot); static void AdvanceStripeRead(ColumnarReadState *readState); static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, bool *columnNulls); @@ -128,7 +131,8 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - int64 *chunkGroupsFiltered); + int64 *chunkGroupsFiltered, + Snapshot snapshot); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray, uint32 chunkCount, uint64 stripeOffset, @@ -167,7 +171,7 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, - MemoryContext scanContext) + MemoryContext scanContext, Snapshot snapshot) { /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -187,8 +191,9 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, readState->stripeReadState = NULL; readState->currentStripeMetadata = FindNextStripeByRowNumber(relation, COLUMNAR_INVALID_ROW_NUMBER, - GetTransactionSnapshot()); + snapshot); readState->scanContext = scanContext; + readState->snapshot = snapshot; return readState; } @@ -230,7 +235,8 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col readState->projectedColumnList, readState->whereClauseList, readState->whereClauseVars, - readState->stripeReadContext); + readState->stripeReadContext, + readState->snapshot); } if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls)) @@ -260,11 +266,12 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, - bool *columnNulls, Snapshot snapshot) + bool *columnNulls) { if (!ColumnarReadIsCurrentStripe(readState, rowNumber)) { Relation columnarRelation = readState->relation; + Snapshot snapshot = readState->snapshot; StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation, rowNumber, snapshot); if (stripeMetadata == NULL) @@ -286,7 +293,8 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, readState->projectedColumnList, whereClauseList, whereClauseVars, - stripeReadContext); + stripeReadContext, + snapshot); readState->currentStripeMetadata = stripeMetadata; } @@ -446,7 +454,7 @@ ColumnarRescan(ColumnarReadState *readState) ColumnarResetRead(readState); readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, COLUMNAR_INVALID_ROW_NUMBER, - GetTransactionSnapshot()); + readState->snapshot); readState->chunkGroupsFiltered = 0; MemoryContextSwitchTo(oldContext); @@ -493,7 +501,7 @@ ColumnarResetRead(ColumnarReadState *readState) static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - MemoryContext stripeReadContext) + MemoryContext stripeReadContext, Snapshot snapshot) { MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); @@ -513,7 +521,8 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes whereClauseList, whereClauseVars, &stripeReadState-> - chunkGroupsFiltered); + chunkGroupsFiltered, + snapshot); stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; @@ -540,7 +549,7 @@ AdvanceStripeRead(ColumnarReadState *readState) StripeGetHighestRowNumber(readState->currentStripeMetadata); readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, lastReadRowNumber, - GetTransactionSnapshot()); + readState->snapshot); readState->stripeReadState = NULL; MemoryContextReset(readState->stripeReadContext); @@ -792,7 +801,7 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - int64 *chunkGroupsFiltered) + int64 *chunkGroupsFiltered, Snapshot snapshot) { uint32 columnIndex = 0; uint32 columnCount = tupleDescriptor->natts; @@ -802,7 +811,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node, stripeMetadata->id, tupleDescriptor, - stripeMetadata->chunkCount); + stripeMetadata->chunkCount, + snapshot); bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList, whereClauseVars, chunkGroupsFiltered); diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 6630d0430..ea8c6df1e 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -253,13 +253,13 @@ CreateColumnarScanMemoryContext(void) */ static ColumnarReadState * init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed, - List *scanQual, MemoryContext scanContext) + List *scanQual, MemoryContext scanContext, Snapshot snapshot) { MemoryContext oldContext = MemoryContextSwitchTo(scanContext); List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, - scanQual, scanContext); + scanQual, scanContext, snapshot); MemoryContextSwitchTo(oldContext); @@ -309,7 +309,7 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo scan->cs_readState = init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor, scan->attr_needed, scan->scanQual, - scan->scanContext); + scan->scanContext, scan->cs_base.rs_snapshot); } ExecClearTuple(slot); @@ -503,12 +503,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, scan->cs_readState = init_columnar_read_state(columnarRelation, slot->tts_tupleDescriptor, attr_needed, scanQual, - scan->scanContext); + scan->scanContext, + snapshot); } uint64 rowNumber = tid_to_row_number(*tid); if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values, - slot->tts_isnull, snapshot)) + slot->tts_isnull)) { return false; } @@ -550,7 +551,9 @@ static bool columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) { - return true; + uint64 rowNumber = tid_to_row_number(slot->tts_tid); + StripeMetadata *stripeMetadata = FindStripeByRowNumber(rel, rowNumber, snapshot); + return stripeMetadata != NULL; } @@ -800,10 +803,13 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* no quals for table rewrite */ List *scanQual = NIL; + /* use SnapshotAny when re-writing table as heapAM does */ + Snapshot snapshot = SnapshotAny; + MemoryContext scanContext = CreateColumnarScanMemoryContext(); ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc, attr_needed, scanQual, - scanContext); + scanContext, snapshot); Datum *values = palloc0(sourceDesc->natts * sizeof(Datum)); bool *nulls = palloc0(sourceDesc->natts * sizeof(bool)); @@ -911,7 +917,8 @@ LogRelationStats(Relation rel, int elevel) StripeMetadata *stripe = lfirst(stripeMetadataCell); StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id, RelationGetDescr(rel), - stripe->chunkCount); + stripe->chunkCount, + GetTransactionSnapshot()); for (uint32 column = 0; column < skiplist->columnCount; column++) { bool attrDropped = tupdesc->attrs[column].attisdropped; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index a44072ffe..6ca7450ac 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -213,13 +213,14 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *qualConditions, - MemoryContext scanContext); + MemoryContext scanContext, + Snapshot snaphot); extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues, bool *columnNulls, uint64 *rowNumber); extern void ColumnarRescan(ColumnarReadState *readState); extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, - bool *columnNulls, Snapshot snapshot); + bool *columnNulls); extern void ColumnarEndRead(ColumnarReadState *state); extern void ColumnarResetRead(ColumnarReadState *readState); extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state); @@ -255,7 +256,8 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, List *chunkGroupRowCounts); extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor, - uint32 chunkCount); + uint32 chunkCount, + Snapshot snapshot); extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot); extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,