mirror of https://github.com/citusdata/citus.git
Remove stripeList & currentStripe from ColumnarReadState
parent
16dee73b10
commit
8942086506
|
@ -73,7 +73,12 @@ typedef enum RowNumberLookupMode
|
||||||
* Find the stripe whose firstRowNumber is less than or equal to given
|
* Find the stripe whose firstRowNumber is less than or equal to given
|
||||||
* input rowNumber.
|
* input rowNumber.
|
||||||
*/
|
*/
|
||||||
FIND_LESS_OR_EQUAL
|
FIND_LESS_OR_EQUAL,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find the stripe whose firstRowNumber is greater than input rowNumber.
|
||||||
|
*/
|
||||||
|
FIND_GREATER
|
||||||
} RowNumberLookupMode;
|
} RowNumberLookupMode;
|
||||||
|
|
||||||
static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
|
static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
|
||||||
|
@ -634,6 +639,18 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindStripeByRowNumber returns StripeMetadata for the stripe whose
|
||||||
|
* firstRowNumber is greater than given rowNumber. If no such stripe
|
||||||
|
* exists, then returns NULL.
|
||||||
|
*/
|
||||||
|
StripeMetadata *
|
||||||
|
FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||||
|
{
|
||||||
|
return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot, FIND_GREATER);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindStripeByRowNumber returns StripeMetadata for the stripe that contains
|
* FindStripeByRowNumber returns StripeMetadata for the stripe that contains
|
||||||
* the row with rowNumber. If no such stripe exists, then returns NULL.
|
* the row with rowNumber. If no such stripe exists, then returns NULL.
|
||||||
|
@ -662,15 +679,16 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
|
* StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
|
||||||
* firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL) given rowNumber
|
* firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL), or is
|
||||||
* by doing backward index scan on stripe_first_row_number_idx.
|
* greater than (FIND_GREATER) given rowNumber by doing backward index
|
||||||
|
* scan on stripe_first_row_number_idx.
|
||||||
* If no such stripe exists, then returns NULL.
|
* If no such stripe exists, then returns NULL.
|
||||||
*/
|
*/
|
||||||
static StripeMetadata *
|
static StripeMetadata *
|
||||||
StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
|
StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
|
||||||
RowNumberLookupMode lookupMode)
|
RowNumberLookupMode lookupMode)
|
||||||
{
|
{
|
||||||
Assert(lookupMode == FIND_LESS_OR_EQUAL);
|
Assert(lookupMode == FIND_LESS_OR_EQUAL || lookupMode == FIND_GREATER);
|
||||||
|
|
||||||
StripeMetadata *foundStripeMetadata = NULL;
|
StripeMetadata *foundStripeMetadata = NULL;
|
||||||
|
|
||||||
|
@ -686,6 +704,11 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
||||||
strategyNumber = BTLessEqualStrategyNumber;
|
strategyNumber = BTLessEqualStrategyNumber;
|
||||||
procedure = F_INT8LE;
|
procedure = F_INT8LE;
|
||||||
}
|
}
|
||||||
|
else if (lookupMode == FIND_GREATER)
|
||||||
|
{
|
||||||
|
strategyNumber = BTGreaterStrategyNumber;
|
||||||
|
procedure = F_INT8GT;
|
||||||
|
}
|
||||||
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
|
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
|
||||||
strategyNumber, procedure, UInt64GetDatum(rowNumber));
|
strategyNumber, procedure, UInt64GetDatum(rowNumber));
|
||||||
|
|
||||||
|
@ -702,6 +725,10 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
|
||||||
{
|
{
|
||||||
scanDirection = BackwardScanDirection;
|
scanDirection = BackwardScanDirection;
|
||||||
}
|
}
|
||||||
|
else if (lookupMode == FIND_GREATER)
|
||||||
|
{
|
||||||
|
scanDirection = ForwardScanDirection;
|
||||||
|
}
|
||||||
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
|
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
|
||||||
if (HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
|
|
|
@ -64,11 +64,10 @@ typedef struct StripeReadState
|
||||||
|
|
||||||
struct ColumnarReadState
|
struct ColumnarReadState
|
||||||
{
|
{
|
||||||
List *stripeList;
|
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
Relation relation;
|
Relation relation;
|
||||||
|
|
||||||
int64 currentStripe; /* index of current stripe */
|
StripeMetadata *currentStripeMetadata;
|
||||||
StripeReadState *stripeReadState;
|
StripeReadState *stripeReadState;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -159,15 +158,6 @@ ColumnarReadState *
|
||||||
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *whereClauseList)
|
List *projectedColumnList, List *whereClauseList)
|
||||||
{
|
{
|
||||||
List *stripeList = StripesForRelfilenode(relation->rd_node);
|
|
||||||
StripeMetadata *stripeMetadata = NULL;
|
|
||||||
|
|
||||||
uint64 totalRowCount = 0;
|
|
||||||
foreach_ptr(stripeMetadata, stripeList)
|
|
||||||
{
|
|
||||||
totalRowCount += stripeMetadata->rowCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||||
* this memory context before loading a new stripe. This is to avoid memory
|
* this memory context before loading a new stripe. This is to avoid memory
|
||||||
|
@ -177,7 +167,6 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
|
|
||||||
ColumnarReadState *readState = palloc0(sizeof(ColumnarReadState));
|
ColumnarReadState *readState = palloc0(sizeof(ColumnarReadState));
|
||||||
readState->relation = relation;
|
readState->relation = relation;
|
||||||
readState->stripeList = stripeList;
|
|
||||||
readState->projectedColumnList = projectedColumnList;
|
readState->projectedColumnList = projectedColumnList;
|
||||||
readState->whereClauseList = whereClauseList;
|
readState->whereClauseList = whereClauseList;
|
||||||
readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
|
readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
|
||||||
|
@ -185,6 +174,9 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
readState->tupleDescriptor = tupleDescriptor;
|
readState->tupleDescriptor = tupleDescriptor;
|
||||||
readState->stripeReadContext = stripeReadContext;
|
readState->stripeReadContext = stripeReadContext;
|
||||||
readState->stripeReadState = NULL;
|
readState->stripeReadState = NULL;
|
||||||
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(relation,
|
||||||
|
COLUMNAR_INVALID_ROW_NUMBER,
|
||||||
|
GetTransactionSnapshot());
|
||||||
|
|
||||||
return readState;
|
return readState;
|
||||||
}
|
}
|
||||||
|
@ -220,9 +212,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
readState->stripeReadState = BeginStripeRead(readState->currentStripeMetadata,
|
||||||
readState->currentStripe);
|
|
||||||
readState->stripeReadState = BeginStripeRead(stripeMetadata,
|
|
||||||
readState->relation,
|
readState->relation,
|
||||||
readState->tupleDescriptor,
|
readState->tupleDescriptor,
|
||||||
readState->projectedColumnList,
|
readState->projectedColumnList,
|
||||||
|
@ -239,9 +229,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
|
|
||||||
if (rowNumber)
|
if (rowNumber)
|
||||||
{
|
{
|
||||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
*rowNumber = readState->currentStripeMetadata->firstRowNumber +
|
||||||
readState->currentStripe);
|
|
||||||
*rowNumber = stripeMetadata->firstRowNumber +
|
|
||||||
readState->stripeReadState->currentRow - 1;
|
readState->stripeReadState->currentRow - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -367,8 +355,7 @@ StripeReadInProgress(ColumnarReadState *readState)
|
||||||
static bool
|
static bool
|
||||||
HasUnreadStripe(ColumnarReadState *readState)
|
HasUnreadStripe(ColumnarReadState *readState)
|
||||||
{
|
{
|
||||||
uint32 stripeCount = list_length(readState->stripeList);
|
return readState->currentStripeMetadata != NULL;
|
||||||
return readState->currentStripe < stripeCount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -380,7 +367,9 @@ void
|
||||||
ColumnarRescan(ColumnarReadState *readState)
|
ColumnarRescan(ColumnarReadState *readState)
|
||||||
{
|
{
|
||||||
readState->stripeReadState = NULL;
|
readState->stripeReadState = NULL;
|
||||||
readState->currentStripe = 0;
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||||
|
COLUMNAR_INVALID_ROW_NUMBER,
|
||||||
|
GetTransactionSnapshot());
|
||||||
readState->chunkGroupsFiltered = 0;
|
readState->chunkGroupsFiltered = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,7 +381,11 @@ void
|
||||||
ColumnarEndRead(ColumnarReadState *readState)
|
ColumnarEndRead(ColumnarReadState *readState)
|
||||||
{
|
{
|
||||||
MemoryContextDelete(readState->stripeReadContext);
|
MemoryContextDelete(readState->stripeReadContext);
|
||||||
list_free_deep(readState->stripeList);
|
if (readState->currentStripeMetadata)
|
||||||
|
{
|
||||||
|
pfree(readState->currentStripeMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
pfree(readState);
|
pfree(readState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,17 +438,23 @@ EndStripeRead(StripeReadState *stripeReadState)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AdvanceStripeRead updates chunkGroupsFiltered and increments currentStripe
|
* AdvanceStripeRead updates chunkGroupsFiltered and sets
|
||||||
* for next stripe read.
|
* currentStripeMetadata for next stripe read.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AdvanceStripeRead(ColumnarReadState *readState)
|
AdvanceStripeRead(ColumnarReadState *readState)
|
||||||
{
|
{
|
||||||
readState->chunkGroupsFiltered +=
|
readState->chunkGroupsFiltered +=
|
||||||
readState->stripeReadState->chunkGroupsFiltered;
|
readState->stripeReadState->chunkGroupsFiltered;
|
||||||
|
|
||||||
|
uint64 lastReadRowNumber = readState->currentStripeMetadata->firstRowNumber +
|
||||||
|
readState->currentStripeMetadata->rowCount - 1;
|
||||||
|
|
||||||
EndStripeRead(readState->stripeReadState);
|
EndStripeRead(readState->stripeReadState);
|
||||||
|
|
||||||
readState->currentStripe++;
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||||
|
lastReadRowNumber,
|
||||||
|
GetTransactionSnapshot());
|
||||||
readState->stripeReadState = NULL;
|
readState->stripeReadState = NULL;
|
||||||
MemoryContextReset(readState->stripeReadContext);
|
MemoryContextReset(readState->stripeReadContext);
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,6 +254,8 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||||
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
uint32 chunkCount);
|
uint32 chunkCount);
|
||||||
|
extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||||
|
Snapshot snapshot);
|
||||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||||
Snapshot snapshot);
|
Snapshot snapshot);
|
||||||
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||||
|
|
Loading…
Reference in New Issue