mirror of https://github.com/citusdata/citus.git
Refactor ColumnarReadNextRow for better readability (#4823)
parent
3efdfdd791
commit
716cc629f1
|
@ -83,11 +83,14 @@ struct ColumnarReadState
|
||||||
};
|
};
|
||||||
|
|
||||||
/* static function declarations */
|
/* static function declarations */
|
||||||
|
static bool StripeReadInProgress(ColumnarReadState *readState);
|
||||||
|
static bool HasUnreadStripe(ColumnarReadState *readState);
|
||||||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||||
TupleDesc tupleDesc, List *projectedColumnList,
|
TupleDesc tupleDesc, List *projectedColumnList,
|
||||||
List *whereClauseList, List *whereClauseVars,
|
List *whereClauseList, List *whereClauseVars,
|
||||||
MemoryContext stripeReadContext);
|
MemoryContext stripeReadContext);
|
||||||
static void EndStripeRead(StripeReadState *stripeReadState);
|
static void EndStripeRead(StripeReadState *stripeReadState);
|
||||||
|
static void AdvanceStripeRead(ColumnarReadState *readState);
|
||||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
|
static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
|
||||||
|
@ -188,18 +191,15 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (readState->stripeReadState == NULL)
|
if (!StripeReadInProgress(readState))
|
||||||
{
|
{
|
||||||
uint32 stripeCount = list_length(readState->stripeList);
|
if (!HasUnreadStripe(readState))
|
||||||
|
|
||||||
if (readState->currentStripe >= stripeCount)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
||||||
readState->currentStripe);
|
readState->currentStripe);
|
||||||
|
|
||||||
readState->stripeReadState = BeginStripeRead(stripeMetadata,
|
readState->stripeReadState = BeginStripeRead(stripeMetadata,
|
||||||
readState->relation,
|
readState->relation,
|
||||||
readState->tupleDescriptor,
|
readState->tupleDescriptor,
|
||||||
|
@ -211,13 +211,8 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
|
|
||||||
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
||||||
{
|
{
|
||||||
readState->chunkGroupsFiltered +=
|
|
||||||
readState->stripeReadState->chunkGroupsFiltered;
|
|
||||||
readState->currentStripe++;
|
|
||||||
EndStripeRead(readState->stripeReadState);
|
EndStripeRead(readState->stripeReadState);
|
||||||
readState->stripeReadState = NULL;
|
AdvanceStripeRead(readState);
|
||||||
MemoryContextReset(readState->stripeReadContext);
|
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,6 +223,28 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StripeReadInProgress returns true if we already started reading a stripe.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
StripeReadInProgress(ColumnarReadState *readState)
|
||||||
|
{
|
||||||
|
return readState->stripeReadState != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasUnreadStripe returns true if we still have stripes to read during current
|
||||||
|
* read operation.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasUnreadStripe(ColumnarReadState *readState)
|
||||||
|
{
|
||||||
|
uint32 stripeCount = list_length(readState->stripeList);
|
||||||
|
return readState->currentStripe < stripeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColumnarRescan clears the position where we were scanning so that the next read starts at
|
* ColumnarRescan clears the position where we were scanning so that the next read starts at
|
||||||
* the beginning again
|
* the beginning again
|
||||||
|
@ -300,6 +317,21 @@ EndStripeRead(StripeReadState *stripeReadState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AdvanceStripeRead updates chunkGroupsFiltered and increments currentStripe
|
||||||
|
* for next stripe read.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AdvanceStripeRead(ColumnarReadState *readState)
|
||||||
|
{
|
||||||
|
readState->chunkGroupsFiltered +=
|
||||||
|
readState->stripeReadState->chunkGroupsFiltered;
|
||||||
|
readState->currentStripe++;
|
||||||
|
readState->stripeReadState = NULL;
|
||||||
|
MemoryContextReset(readState->stripeReadContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReadStripeNextRow: If more rows can be read from the current stripe, fill
|
* ReadStripeNextRow: If more rows can be read from the current stripe, fill
|
||||||
* in non-NULL columnValues and return true. Otherwise, return false.
|
* in non-NULL columnValues and return true. Otherwise, return false.
|
||||||
|
|
Loading…
Reference in New Issue