Refactor FindStripeByRowNumber into StripeMetadataLookupRowNumber

Push the most logic in FindStripeByRowNumber down to an helper function
to re-use it in next commit.
pull/5088/head
Onur Tirtir 2021-07-02 17:52:21 +03:00
parent 2954fb0ee8
commit 16dee73b10
1 changed files with 63 additions and 10 deletions

View File

@ -66,6 +66,16 @@ typedef struct
EState *estate;
} ModifyState;
/* RowNumberLookupMode to be used in StripeMetadataLookupRowNumber */
typedef enum RowNumberLookupMode
{
/*
* Find the stripe whose firstRowNumber is less than or equal to given
* input rowNumber.
*/
FIND_LESS_OR_EQUAL
} RowNumberLookupMode;
static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
static void GetHighestUsedAddressAndId(uint64 storageId,
uint64 *highestUsedAddress,
@ -100,6 +110,9 @@ static EState * create_estate_for_relation(Relation rel);
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber,
Snapshot snapshot,
RowNumberLookupMode lookupMode);
PG_FUNCTION_INFO_V1(columnar_relation_storageid);
@ -622,21 +635,60 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
/*
* FindStripeByRowNumber returns StripeMetadata for the stripe that has the
* row with rowNumber by doing backward index scan on
* stripe_first_row_number_idx. If no such row exists, then returns NULL.
* FindStripeByRowNumber returns StripeMetadata for the stripe that contains
* the row with rowNumber. If no such stripe exists, then returns NULL.
*/
StripeMetadata *
FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
{
StripeMetadata *stripeMetadata =
StripeMetadataLookupRowNumber(relation, rowNumber,
snapshot, FIND_LESS_OR_EQUAL);
if (!stripeMetadata)
{
return NULL;
}
uint64 stripeMaxRowNumber = stripeMetadata->firstRowNumber +
stripeMetadata->rowCount - 1;
if (rowNumber > stripeMaxRowNumber)
{
return NULL;
}
return stripeMetadata;
}
/*
* StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
* firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL) given rowNumber
* by doing backward index scan on stripe_first_row_number_idx.
* If no such stripe exists, then returns NULL.
*/
static StripeMetadata *
StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
RowNumberLookupMode lookupMode)
{
Assert(lookupMode == FIND_LESS_OR_EQUAL);
StripeMetadata *foundStripeMetadata = NULL;
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
StrategyNumber strategyNumber = InvalidStrategy;
RegProcedure procedure = InvalidOid;
if (lookupMode == FIND_LESS_OR_EQUAL)
{
strategyNumber = BTLessEqualStrategyNumber;
procedure = F_INT8LE;
}
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
BTLessEqualStrategyNumber, F_INT8LE, UInt64GetDatum(rowNumber));
strategyNumber, procedure, UInt64GetDatum(rowNumber));
Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
@ -645,7 +697,12 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
snapshot, 2,
scanKey);
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
ScanDirection scanDirection = NoMovementScanDirection;
if (lookupMode == FIND_LESS_OR_EQUAL)
{
scanDirection = BackwardScanDirection;
}
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
if (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
@ -653,11 +710,7 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
bool isNullArray[Natts_columnar_stripe];
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
StripeMetadata *stripeMetadata = BuildStripeMetadata(datumArray);
if (rowNumber < stripeMetadata->firstRowNumber + stripeMetadata->rowCount)
{
foundStripeMetadata = stripeMetadata;
}
foundStripeMetadata = BuildStripeMetadata(datumArray);
}
systable_endscan_ordered(scanDescriptor);