mirror of https://github.com/citusdata/citus.git
Merge pull request #5088 from citusdata/col/refactor-reader
Remove stripeList (list of StripeMetadata) & currentStripe (stripeList index of the current stripe being read) from ColumnarReadState, introduce currentStripeMetadata.pull/5092/head
commit
dfcfa18edc
|
@ -66,6 +66,21 @@ typedef struct
|
|||
EState *estate;
|
||||
} ModifyState;
|
||||
|
||||
/* RowNumberLookupMode to be used in StripeMetadataLookupRowNumber */
|
||||
typedef enum RowNumberLookupMode
|
||||
{
|
||||
/*
|
||||
* Find the stripe whose firstRowNumber is less than or equal to given
|
||||
* input rowNumber.
|
||||
*/
|
||||
FIND_LESS_OR_EQUAL,
|
||||
|
||||
/*
|
||||
* Find the stripe whose firstRowNumber is greater than input rowNumber.
|
||||
*/
|
||||
FIND_GREATER
|
||||
} RowNumberLookupMode;
|
||||
|
||||
static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
|
||||
static void GetHighestUsedAddressAndId(uint64 storageId,
|
||||
uint64 *highestUsedAddress,
|
||||
|
@ -86,7 +101,7 @@ static Oid ColumnarChunkIndexRelationId(void);
|
|||
static Oid ColumnarChunkGroupIndexRelationId(void);
|
||||
static Oid ColumnarNamespaceId(void);
|
||||
static uint64 LookupStorageId(RelFileNode relfilenode);
|
||||
static uint64 GetHighestUsedFirstRowNumber(uint64 storageId);
|
||||
static uint64 GetHighestUsedRowNumber(uint64 storageId);
|
||||
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
|
||||
AttrNumber storageIdAtrrNumber,
|
||||
Oid storageIdIndexId,
|
||||
|
@ -100,6 +115,9 @@ static EState * create_estate_for_relation(Relation rel);
|
|||
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
|
||||
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
|
||||
static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
|
||||
static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot,
|
||||
RowNumberLookupMode lookupMode);
|
||||
|
||||
PG_FUNCTION_INFO_V1(columnar_relation_storageid);
|
||||
|
||||
|
@ -622,21 +640,87 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
|||
|
||||
|
||||
/*
|
||||
* FindStripeByRowNumber returns StripeMetadata for the stripe that has the
|
||||
* row with rowNumber by doing backward index scan on
|
||||
* stripe_first_row_number_idx. If no such row exists, then returns NULL.
|
||||
* FindStripeByRowNumber returns StripeMetadata for the stripe whose
|
||||
* firstRowNumber is greater than given rowNumber. If no such stripe
|
||||
* exists, then returns NULL.
|
||||
*/
|
||||
StripeMetadata *
|
||||
FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||
{
|
||||
return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot, FIND_GREATER);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindStripeByRowNumber returns StripeMetadata for the stripe that contains
|
||||
* the row with rowNumber. If no such stripe exists, then returns NULL.
|
||||
*/
|
||||
StripeMetadata *
|
||||
FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||
{
|
||||
StripeMetadata *stripeMetadata =
|
||||
StripeMetadataLookupRowNumber(relation, rowNumber,
|
||||
snapshot, FIND_LESS_OR_EQUAL);
|
||||
if (!stripeMetadata)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rowNumber > StripeGetHighestRowNumber(stripeMetadata))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return stripeMetadata;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StripeGetHighestRowNumber returns rowNumber of the row with highest
|
||||
* rowNumber in given stripe.
|
||||
*/
|
||||
uint64
|
||||
StripeGetHighestRowNumber(StripeMetadata *stripeMetadata)
|
||||
{
|
||||
return stripeMetadata->firstRowNumber + stripeMetadata->rowCount - 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
|
||||
* firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL), or is
|
||||
* greater than (FIND_GREATER) given rowNumber by doing backward index
|
||||
* scan on stripe_first_row_number_idx.
|
||||
* If no such stripe exists, then returns NULL.
|
||||
*/
|
||||
static StripeMetadata *
|
||||
StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
|
||||
RowNumberLookupMode lookupMode)
|
||||
{
|
||||
Assert(lookupMode == FIND_LESS_OR_EQUAL || lookupMode == FIND_GREATER);
|
||||
|
||||
StripeMetadata *foundStripeMetadata = NULL;
|
||||
|
||||
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
|
||||
ScanKeyData scanKey[2];
|
||||
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
|
||||
|
||||
StrategyNumber strategyNumber = InvalidStrategy;
|
||||
RegProcedure procedure = InvalidOid;
|
||||
if (lookupMode == FIND_LESS_OR_EQUAL)
|
||||
{
|
||||
strategyNumber = BTLessEqualStrategyNumber;
|
||||
procedure = F_INT8LE;
|
||||
}
|
||||
else if (lookupMode == FIND_GREATER)
|
||||
{
|
||||
strategyNumber = BTGreaterStrategyNumber;
|
||||
procedure = F_INT8GT;
|
||||
}
|
||||
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
|
||||
BTLessEqualStrategyNumber, F_INT8LE, UInt64GetDatum(rowNumber));
|
||||
strategyNumber, procedure, UInt64GetDatum(rowNumber));
|
||||
|
||||
|
||||
Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
|
||||
Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
|
||||
|
@ -645,7 +729,16 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
|||
snapshot, 2,
|
||||
scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
|
||||
ScanDirection scanDirection = NoMovementScanDirection;
|
||||
if (lookupMode == FIND_LESS_OR_EQUAL)
|
||||
{
|
||||
scanDirection = BackwardScanDirection;
|
||||
}
|
||||
else if (lookupMode == FIND_GREATER)
|
||||
{
|
||||
scanDirection = ForwardScanDirection;
|
||||
}
|
||||
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||
|
@ -653,11 +746,7 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
|||
bool isNullArray[Natts_columnar_stripe];
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
StripeMetadata *stripeMetadata = BuildStripeMetadata(datumArray);
|
||||
if (rowNumber < stripeMetadata->firstRowNumber + stripeMetadata->rowCount)
|
||||
{
|
||||
foundStripeMetadata = stripeMetadata;
|
||||
}
|
||||
foundStripeMetadata = BuildStripeMetadata(datumArray);
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
|
@ -1409,41 +1498,33 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
|
|||
|
||||
uint64 reservedStripeId = highestId + 1;
|
||||
uint64 reservedOffset = highestOffset + 1;
|
||||
uint64 reservedRowNumber = GetHighestUsedFirstRowNumber(storageId) + 1;
|
||||
uint64 reservedRowNumber = GetHighestUsedRowNumber(storageId) + 1;
|
||||
ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
|
||||
reservedRowNumber, reservedOffset);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetHighestUsedFirstRowNumber returns the highest used first_row_number
|
||||
* for given storageId. Returns COLUMNAR_INVALID_ROW_NUMBER if storage with
|
||||
* GetHighestUsedRowNumber returns the highest used rowNumber for given
|
||||
* storageId. Returns COLUMNAR_INVALID_ROW_NUMBER if storage with
|
||||
* storageId has no stripes.
|
||||
* Note that normally we would use ColumnarStorageGetReservedRowNumber
|
||||
* to decide that. However, this function is designed to be used when
|
||||
* building the metapage itself during upgrades.
|
||||
*/
|
||||
static uint64
|
||||
GetHighestUsedFirstRowNumber(uint64 storageId)
|
||||
GetHighestUsedRowNumber(uint64 storageId)
|
||||
{
|
||||
uint64 highestRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
|
||||
|
||||
List *stripeMetadataList = ReadDataFileStripeList(storageId,
|
||||
GetTransactionSnapshot());
|
||||
if (list_length(stripeMetadataList) == 0)
|
||||
{
|
||||
return COLUMNAR_INVALID_ROW_NUMBER;
|
||||
}
|
||||
|
||||
/* XXX: Better to have an invalid value for StripeMetadata.rowCount too */
|
||||
uint64 stripeRowCount = -1;
|
||||
uint64 highestFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
|
||||
|
||||
StripeMetadata *stripeMetadata = NULL;
|
||||
foreach_ptr(stripeMetadata, stripeMetadataList)
|
||||
{
|
||||
highestFirstRowNumber = Max(highestFirstRowNumber,
|
||||
stripeMetadata->firstRowNumber);
|
||||
stripeRowCount = stripeMetadata->rowCount;
|
||||
highestRowNumber = Max(highestRowNumber,
|
||||
StripeGetHighestRowNumber(stripeMetadata));
|
||||
}
|
||||
|
||||
return highestFirstRowNumber + stripeRowCount - 1;
|
||||
return highestRowNumber;
|
||||
}
|
||||
|
|
|
@ -64,11 +64,10 @@ typedef struct StripeReadState
|
|||
|
||||
struct ColumnarReadState
|
||||
{
|
||||
List *stripeList;
|
||||
TupleDesc tupleDescriptor;
|
||||
Relation relation;
|
||||
|
||||
int64 currentStripe; /* index of current stripe */
|
||||
StripeMetadata *currentStripeMetadata;
|
||||
StripeReadState *stripeReadState;
|
||||
|
||||
/*
|
||||
|
@ -159,15 +158,6 @@ ColumnarReadState *
|
|||
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
List *stripeList = StripesForRelfilenode(relation->rd_node);
|
||||
StripeMetadata *stripeMetadata = NULL;
|
||||
|
||||
uint64 totalRowCount = 0;
|
||||
foreach_ptr(stripeMetadata, stripeList)
|
||||
{
|
||||
totalRowCount += stripeMetadata->rowCount;
|
||||
}
|
||||
|
||||
/*
|
||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||
* this memory context before loading a new stripe. This is to avoid memory
|
||||
|
@ -177,7 +167,6 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
|
||||
ColumnarReadState *readState = palloc0(sizeof(ColumnarReadState));
|
||||
readState->relation = relation;
|
||||
readState->stripeList = stripeList;
|
||||
readState->projectedColumnList = projectedColumnList;
|
||||
readState->whereClauseList = whereClauseList;
|
||||
readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
|
||||
|
@ -185,6 +174,9 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
readState->tupleDescriptor = tupleDescriptor;
|
||||
readState->stripeReadContext = stripeReadContext;
|
||||
readState->stripeReadState = NULL;
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(relation,
|
||||
COLUMNAR_INVALID_ROW_NUMBER,
|
||||
GetTransactionSnapshot());
|
||||
|
||||
return readState;
|
||||
}
|
||||
|
@ -220,9 +212,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
|||
return false;
|
||||
}
|
||||
|
||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
||||
readState->currentStripe);
|
||||
readState->stripeReadState = BeginStripeRead(stripeMetadata,
|
||||
readState->stripeReadState = BeginStripeRead(readState->currentStripeMetadata,
|
||||
readState->relation,
|
||||
readState->tupleDescriptor,
|
||||
readState->projectedColumnList,
|
||||
|
@ -239,9 +229,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
|||
|
||||
if (rowNumber)
|
||||
{
|
||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
||||
readState->currentStripe);
|
||||
*rowNumber = stripeMetadata->firstRowNumber +
|
||||
*rowNumber = readState->currentStripeMetadata->firstRowNumber +
|
||||
readState->stripeReadState->currentRow - 1;
|
||||
}
|
||||
|
||||
|
@ -367,8 +355,7 @@ StripeReadInProgress(ColumnarReadState *readState)
|
|||
static bool
|
||||
HasUnreadStripe(ColumnarReadState *readState)
|
||||
{
|
||||
uint32 stripeCount = list_length(readState->stripeList);
|
||||
return readState->currentStripe < stripeCount;
|
||||
return readState->currentStripeMetadata != NULL;
|
||||
}
|
||||
|
||||
|
||||
|
@ -380,7 +367,9 @@ void
|
|||
ColumnarRescan(ColumnarReadState *readState)
|
||||
{
|
||||
readState->stripeReadState = NULL;
|
||||
readState->currentStripe = 0;
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||
COLUMNAR_INVALID_ROW_NUMBER,
|
||||
GetTransactionSnapshot());
|
||||
readState->chunkGroupsFiltered = 0;
|
||||
}
|
||||
|
||||
|
@ -392,7 +381,11 @@ void
|
|||
ColumnarEndRead(ColumnarReadState *readState)
|
||||
{
|
||||
MemoryContextDelete(readState->stripeReadContext);
|
||||
list_free_deep(readState->stripeList);
|
||||
if (readState->currentStripeMetadata)
|
||||
{
|
||||
pfree(readState->currentStripeMetadata);
|
||||
}
|
||||
|
||||
pfree(readState);
|
||||
}
|
||||
|
||||
|
@ -445,17 +438,23 @@ EndStripeRead(StripeReadState *stripeReadState)
|
|||
|
||||
|
||||
/*
|
||||
* AdvanceStripeRead updates chunkGroupsFiltered and increments currentStripe
|
||||
* for next stripe read.
|
||||
* AdvanceStripeRead updates chunkGroupsFiltered and sets
|
||||
* currentStripeMetadata for next stripe read.
|
||||
*/
|
||||
static void
|
||||
AdvanceStripeRead(ColumnarReadState *readState)
|
||||
{
|
||||
readState->chunkGroupsFiltered +=
|
||||
readState->stripeReadState->chunkGroupsFiltered;
|
||||
|
||||
uint64 lastReadRowNumber =
|
||||
StripeGetHighestRowNumber(readState->currentStripeMetadata);
|
||||
|
||||
EndStripeRead(readState->stripeReadState);
|
||||
|
||||
readState->currentStripe++;
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||
lastReadRowNumber,
|
||||
GetTransactionSnapshot());
|
||||
readState->stripeReadState = NULL;
|
||||
MemoryContextReset(readState->stripeReadContext);
|
||||
}
|
||||
|
|
|
@ -1295,8 +1295,7 @@ ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot)
|
|||
return invalidItemPtr;
|
||||
}
|
||||
|
||||
uint64 highestRowNumber = stripeWithHighestRowNumber->firstRowNumber +
|
||||
stripeWithHighestRowNumber->rowCount - 1;
|
||||
uint64 highestRowNumber = StripeGetHighestRowNumber(stripeWithHighestRowNumber);
|
||||
return row_number_to_tid(highestRowNumber);
|
||||
}
|
||||
|
||||
|
|
|
@ -254,8 +254,11 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
|||
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||
TupleDesc tupleDescriptor,
|
||||
uint32 chunkCount);
|
||||
extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata);
|
||||
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||
Snapshot snapshot);
|
||||
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
||||
|
|
Loading…
Reference in New Issue