mirror of https://github.com/citusdata/citus.git
apply visibility during reads
parent
d11146ae5b
commit
f8eb0537e2
|
@ -1367,6 +1367,44 @@ ColumnarVisibilityRelationId(void)
|
|||
}
|
||||
|
||||
|
||||
Bitmapset *
|
||||
ReadDeletedTuplesForStripe(RelFileNode relfilenode, uint64 stripenum, Snapshot snapshot)
|
||||
{
|
||||
Bitmapset *deletedTuples = NULL;
|
||||
|
||||
uint64 storageId = LookupStorageId(relfilenode);
|
||||
ScanKeyData scanKey[2];
|
||||
ScanKeyInit(&scanKey[0], Anum_columnar_visibility_storage_id,
|
||||
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
|
||||
ScanKeyInit(&scanKey[1], Anum_columnar_visibility_stripe_num,
|
||||
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(stripenum));
|
||||
|
||||
Relation columnarVisibility = table_open(ColumnarVisibilityRelationId(),
|
||||
AccessShareLock);
|
||||
Relation index = index_open(ColumnarVisibilityIndexRelationId(), AccessShareLock);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarVisibility, index,
|
||||
snapshot, 2, scanKey);
|
||||
HeapTuple heapTuple = NULL;
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarVisibility);
|
||||
Datum datumArray[Natts_columnar_visibility];
|
||||
bool isNullArray[Natts_columnar_visibility];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
uint64 rownum = DatumGetUInt64(datumArray[Anum_columnar_visibility_row_num - 1]);
|
||||
|
||||
deletedTuples = bms_add_member(deletedTuples, (int) rownum);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(index, AccessShareLock);
|
||||
table_close(columnarVisibility, AccessShareLock);
|
||||
|
||||
return deletedTuples;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnarVisibilityIndexRelationId returns relation id of columnar.visibility_pkey.
|
||||
*/
|
||||
|
|
|
@ -60,6 +60,8 @@ typedef struct StripeReadState
|
|||
StripeBuffers *stripeBuffers; /* allocated in stripeReadContext */
|
||||
List *projectedColumnList; /* borrowed reference */
|
||||
ChunkGroupReadState *chunkGroupReadState; /* owned */
|
||||
|
||||
Bitmapset *deletedTuples;
|
||||
} StripeReadState;
|
||||
|
||||
struct ColumnarReadState
|
||||
|
@ -428,6 +430,9 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
|||
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
|
||||
stripeReadState->deletedTuples = ReadDeletedTuplesForStripe(rel->rd_node,
|
||||
stripeMetadata->id, NULL);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
||||
|
@ -506,6 +511,13 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
|||
continue;
|
||||
}
|
||||
|
||||
/* check if tuple has been removed earlier, read next if that has happened */
|
||||
if (bms_is_member(stripeReadState->currentRow, stripeReadState->deletedTuples))
|
||||
{
|
||||
stripeReadState->currentRow++;
|
||||
continue;
|
||||
}
|
||||
|
||||
stripeReadState->currentRow++;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -262,6 +262,8 @@ extern StripeMetadata * FindStripeByRowNumber(RelFileNode relfilenode, uint64 ro
|
|||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||
Snapshot snapshot);
|
||||
extern Bitmapset * ReadDeletedTuplesForStripe(RelFileNode relfilenode, uint64 stripenum,
|
||||
Snapshot snapshot);
|
||||
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue