mirror of https://github.com/citusdata/citus.git
Columnar: refactor read path and fix zero-column tables. (#4668)
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>pull/4748/head
parent
5ebd4eac7f
commit
fbeb747006
|
@ -40,13 +40,39 @@
|
|||
#include "columnar/columnar.h"
|
||||
#include "columnar/columnar_version_compat.h"
|
||||
|
||||
typedef struct ChunkGroupReadState
|
||||
{
|
||||
int64 currentRow;
|
||||
int64 rowCount;
|
||||
int columnCount;
|
||||
List *projectedColumnList; /* borrowed reference */
|
||||
ChunkData *chunkGroupData;
|
||||
} ChunkGroupReadState;
|
||||
|
||||
typedef struct StripeReadState
|
||||
{
|
||||
int columnCount;
|
||||
int64 rowCount;
|
||||
int64 currentRow;
|
||||
TupleDesc tupleDescriptor;
|
||||
Relation relation;
|
||||
int chunkGroupIndex;
|
||||
int64 chunkGroupsFiltered;
|
||||
MemoryContext stripeReadContext;
|
||||
StripeBuffers *stripeBuffers; /* allocated in stripeReadContext */
|
||||
List *projectedColumnList; /* borrowed reference */
|
||||
ChunkGroupReadState *chunkGroupReadState; /* owned */
|
||||
} StripeReadState;
|
||||
|
||||
struct TableReadState
|
||||
{
|
||||
List *stripeList;
|
||||
StripeMetadata *currentStripeMetadata;
|
||||
TupleDesc tupleDescriptor;
|
||||
Relation relation;
|
||||
|
||||
int64 currentStripe; /* index of current stripe */
|
||||
StripeReadState *stripeReadState;
|
||||
|
||||
/*
|
||||
* Following are used for tables with zero columns, or when no
|
||||
* columns are projected.
|
||||
|
@ -63,25 +89,32 @@ struct TableReadState
|
|||
|
||||
List *whereClauseList;
|
||||
MemoryContext stripeReadContext;
|
||||
StripeBuffers *stripeBuffers;
|
||||
uint32 readStripeCount;
|
||||
uint64 stripeReadRowCount;
|
||||
int64 chunkGroupsFiltered;
|
||||
ChunkData *chunkData;
|
||||
int32 deserializedChunkIndex;
|
||||
};
|
||||
|
||||
/* static function declarations */
|
||||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||
TupleDesc tupleDesc, List *projectedColumnList,
|
||||
List *whereClauseList, MemoryContext
|
||||
stripeReadContext);
|
||||
static void EndStripeRead(StripeReadState *stripeReadState);
|
||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
|
||||
chunkIndex,
|
||||
TupleDesc tupleDesc,
|
||||
List *projectedColumnList,
|
||||
MemoryContext cxt);
|
||||
static void EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState);
|
||||
static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState,
|
||||
Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||
StripeMetadata *stripeMetadata,
|
||||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList,
|
||||
List *whereClauseList,
|
||||
int64 *chunkGroupsFiltered);
|
||||
static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
|
||||
uint64 chunkIndex, uint64 chunkRowIndex,
|
||||
ChunkData *chunkData, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||
ColumnChunkSkipNode *chunkSkipNodeArray,
|
||||
uint32 chunkCount, uint64 stripeOffset,
|
||||
|
@ -142,16 +175,10 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
readState->stripeList = stripeList;
|
||||
readState->projectedColumnList = projectedColumnList;
|
||||
readState->whereClauseList = whereClauseList;
|
||||
readState->stripeBuffers = NULL;
|
||||
readState->readStripeCount = 0;
|
||||
readState->stripeReadRowCount = 0;
|
||||
readState->chunkGroupsFiltered = 0;
|
||||
readState->tupleDescriptor = tupleDescriptor;
|
||||
readState->stripeReadContext = stripeReadContext;
|
||||
readState->chunkData = NULL;
|
||||
readState->deserializedChunkIndex = -1;
|
||||
readState->readRowCount = 0;
|
||||
readState->totalRowCount = totalRowCount;
|
||||
readState->stripeReadState = NULL;
|
||||
|
||||
return readState;
|
||||
}
|
||||
|
@ -165,111 +192,44 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
bool
|
||||
ColumnarReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNulls)
|
||||
{
|
||||
StripeMetadata *stripeMetadata = readState->currentStripeMetadata;
|
||||
MemoryContext oldContext = NULL;
|
||||
|
||||
/*
|
||||
* We rely on first column's metadata in rest of this function. So for zero
|
||||
* column tables we just return "true" for totalRowCount times. We do the
|
||||
* same when no columns are projected.
|
||||
*/
|
||||
if (readState->projectedColumnList == NIL)
|
||||
while (true)
|
||||
{
|
||||
if (readState->totalRowCount == readState->readRowCount)
|
||||
if (readState->stripeReadState == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
int columnCount = readState->tupleDescriptor->natts;
|
||||
memset(columnNulls, 1, sizeof(bool) * columnCount);
|
||||
readState->readRowCount++;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
uint32 stripeCount = list_length(readState->stripeList);
|
||||
|
||||
/*
|
||||
* If no stripes are loaded, load the next non-empty stripe. Note that when
|
||||
* loading stripes, we skip over chunks whose contents can be filtered with
|
||||
* the query's restriction qualifiers. So, even when a stripe is physically
|
||||
* not empty, we may end up loading it as an empty stripe.
|
||||
*/
|
||||
while (readState->stripeBuffers == NULL)
|
||||
{
|
||||
List *stripeMetadataList = readState->stripeList;
|
||||
uint32 stripeCount = list_length(stripeMetadataList);
|
||||
if (readState->currentStripe >= stripeCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* if we have read all stripes, return false */
|
||||
if (readState->readStripeCount == stripeCount)
|
||||
{
|
||||
return false;
|
||||
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
||||
readState->currentStripe);
|
||||
|
||||
readState->stripeReadState = BeginStripeRead(stripeMetadata,
|
||||
readState->relation,
|
||||
readState->tupleDescriptor,
|
||||
readState->projectedColumnList,
|
||||
readState->whereClauseList,
|
||||
readState->stripeReadContext);
|
||||
}
|
||||
|
||||
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
|
||||
MemoryContextReset(readState->stripeReadContext);
|
||||
readState->chunkData = NULL;
|
||||
|
||||
stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount);
|
||||
StripeBuffers *stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
|
||||
stripeMetadata,
|
||||
readState->
|
||||
tupleDescriptor,
|
||||
readState->
|
||||
projectedColumnList,
|
||||
readState->
|
||||
whereClauseList,
|
||||
&readState->
|
||||
chunkGroupsFiltered);
|
||||
readState->readStripeCount++;
|
||||
readState->currentStripeMetadata = stripeMetadata;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
if (stripeBuffers->rowCount != 0)
|
||||
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
||||
{
|
||||
readState->stripeBuffers = stripeBuffers;
|
||||
readState->stripeReadRowCount = 0;
|
||||
readState->deserializedChunkIndex = -1;
|
||||
break;
|
||||
readState->chunkGroupsFiltered +=
|
||||
readState->stripeReadState->chunkGroupsFiltered;
|
||||
readState->currentStripe++;
|
||||
EndStripeRead(readState->stripeReadState);
|
||||
readState->stripeReadState = NULL;
|
||||
MemoryContextReset(readState->stripeReadContext);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
uint32 chunkIndex = readState->stripeReadRowCount / stripeMetadata->chunkRowCount;
|
||||
uint32 chunkRowIndex = readState->stripeReadRowCount % stripeMetadata->chunkRowCount;
|
||||
|
||||
if (chunkIndex != readState->deserializedChunkIndex)
|
||||
{
|
||||
uint32 chunkRowCount =
|
||||
readState->stripeBuffers->selectedChunkRowCount[chunkIndex];
|
||||
|
||||
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
|
||||
|
||||
FreeChunkData(readState->chunkData);
|
||||
readState->chunkData =
|
||||
DeserializeChunkData(readState->stripeBuffers, chunkIndex,
|
||||
chunkRowCount, readState->tupleDescriptor,
|
||||
readState->projectedColumnList);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
readState->deserializedChunkIndex = chunkIndex;
|
||||
}
|
||||
|
||||
ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList,
|
||||
chunkIndex, chunkRowIndex, readState->chunkData,
|
||||
columnValues, columnNulls);
|
||||
|
||||
/*
|
||||
* If we finished reading the current stripe, set stripe data to NULL. That
|
||||
* way, we will load a new stripe the next time this function gets called.
|
||||
*/
|
||||
readState->stripeReadRowCount++;
|
||||
if (readState->stripeReadRowCount == readState->stripeBuffers->rowCount)
|
||||
{
|
||||
readState->stripeBuffers = NULL;
|
||||
}
|
||||
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -280,13 +240,15 @@ ColumnarReadNextRow(TableReadState *readState, Datum *columnValues, bool *column
|
|||
void
|
||||
ColumnarRescan(TableReadState *readState)
|
||||
{
|
||||
readState->stripeBuffers = NULL;
|
||||
readState->readStripeCount = 0;
|
||||
readState->stripeReadRowCount = 0;
|
||||
readState->stripeReadState = NULL;
|
||||
readState->currentStripe = 0;
|
||||
readState->chunkGroupsFiltered = 0;
|
||||
}
|
||||
|
||||
|
||||
/* Finishes a columnar read operation. */
|
||||
/*
|
||||
* Finishes a columnar read operation.
|
||||
*/
|
||||
void
|
||||
ColumnarEndRead(TableReadState *readState)
|
||||
{
|
||||
|
@ -296,6 +258,226 @@ ColumnarEndRead(TableReadState *readState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BeginStripeRead allocates state for reading a stripe.
|
||||
*/
|
||||
static StripeReadState *
|
||||
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, List *whereClauseList, MemoryContext
|
||||
stripeReadContext)
|
||||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||
|
||||
StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState));
|
||||
|
||||
stripeReadState->relation = rel;
|
||||
stripeReadState->tupleDescriptor = tupleDesc;
|
||||
stripeReadState->columnCount = tupleDesc->natts;
|
||||
stripeReadState->chunkGroupReadState = NULL;
|
||||
stripeReadState->projectedColumnList = projectedColumnList;
|
||||
stripeReadState->stripeReadContext = stripeReadContext;
|
||||
|
||||
/*
|
||||
* If there are no attributes in the table at all, reading the chunk
|
||||
* groups will fail (because there are no chunks), so we must introduce a
|
||||
* special case. Also follow this special case if no attributes are
|
||||
* projected, so that we won't have to deal with deleted attributes,
|
||||
* either.
|
||||
*
|
||||
* TODO: refactor metadata so that chunk groups hold the row count; rather
|
||||
* than individual chunks (which is repetitive in the normal case, and
|
||||
* problematic in the case where there are zero columns).
|
||||
*/
|
||||
if (list_length(projectedColumnList) != 0)
|
||||
{
|
||||
stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
|
||||
stripeMetadata,
|
||||
tupleDesc,
|
||||
projectedColumnList,
|
||||
whereClauseList,
|
||||
&stripeReadState->
|
||||
chunkGroupsFiltered);
|
||||
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
stripeReadState->stripeBuffers = NULL;
|
||||
|
||||
/*
|
||||
* If there are no projected columns, then no chunks will be filtered,
|
||||
* so the row count is simply the stripe's row count.
|
||||
*/
|
||||
stripeReadState->rowCount = stripeMetadata->rowCount;
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
||||
return stripeReadState;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EndStripeRead finishes a stripe read.
|
||||
*/
|
||||
static void
|
||||
EndStripeRead(StripeReadState *stripeReadState)
|
||||
{
|
||||
pfree(stripeReadState);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadStripeNextRow: If more rows can be read from the current stripe, fill
|
||||
* in non-NULL columnValues and return true. Otherwise, return false.
|
||||
*
|
||||
* On entry, all entries in columnNulls should be true; this function only
|
||||
* sets non-NULL entries.
|
||||
*
|
||||
*/
|
||||
static bool
|
||||
ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||
bool *columnNulls)
|
||||
{
|
||||
if (stripeReadState->currentRow >= stripeReadState->rowCount)
|
||||
{
|
||||
Assert(stripeReadState->currentRow == stripeReadState->rowCount);
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are no attributes in the table at all, stripeBuffers won't be
|
||||
* loaded so we just return rowCount empty tuples.
|
||||
*/
|
||||
if (stripeReadState->stripeBuffers == NULL)
|
||||
{
|
||||
if (stripeReadState->currentRow < stripeReadState->rowCount)
|
||||
{
|
||||
stripeReadState->currentRow++;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (stripeReadState->chunkGroupReadState == NULL)
|
||||
{
|
||||
stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
|
||||
stripeReadState->stripeBuffers,
|
||||
stripeReadState->
|
||||
chunkGroupIndex,
|
||||
stripeReadState->
|
||||
tupleDescriptor,
|
||||
stripeReadState->
|
||||
projectedColumnList,
|
||||
stripeReadState->
|
||||
stripeReadContext);
|
||||
}
|
||||
|
||||
if (!ReadChunkGroupNextRow(stripeReadState->chunkGroupReadState, columnValues,
|
||||
columnNulls))
|
||||
{
|
||||
/* if this chunk group is exhausted, fetch the next one and loop */
|
||||
EndChunkGroupRead(stripeReadState->chunkGroupReadState);
|
||||
stripeReadState->chunkGroupReadState = NULL;
|
||||
stripeReadState->chunkGroupIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
stripeReadState->currentRow++;
|
||||
return true;
|
||||
}
|
||||
|
||||
Assert(stripeReadState->currentRow == stripeReadState->rowCount);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BeginChunkGroupRead allocates state for reading a chunk.
|
||||
*/
|
||||
static ChunkGroupReadState *
|
||||
BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, MemoryContext cxt)
|
||||
{
|
||||
uint32 chunkRowCount = stripeBuffers->selectedChunkRowCount[chunkIndex];
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(cxt);
|
||||
|
||||
ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState));
|
||||
|
||||
chunkGroupReadState->currentRow = 0;
|
||||
chunkGroupReadState->rowCount = chunkRowCount;
|
||||
chunkGroupReadState->columnCount = tupleDesc->natts;
|
||||
chunkGroupReadState->projectedColumnList = projectedColumnList;
|
||||
|
||||
chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex,
|
||||
chunkRowCount, tupleDesc,
|
||||
projectedColumnList);
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return chunkGroupReadState;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EndChunkRead finishes a chunk read.
|
||||
*/
|
||||
static void
|
||||
EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState)
|
||||
{
|
||||
FreeChunkData(chunkGroupReadState->chunkGroupData);
|
||||
pfree(chunkGroupReadState);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadChunkGroupNextRow: if more rows can be read from the current chunk
|
||||
* group, fill in non-NULL columnValues and return true. Otherwise, return
|
||||
* false.
|
||||
*
|
||||
* On entry, all entries in columnNulls should be true; this function only
|
||||
* sets non-NULL entries.
|
||||
*/
|
||||
static bool
|
||||
ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, Datum *columnValues,
|
||||
bool *columnNulls)
|
||||
{
|
||||
if (chunkGroupReadState->currentRow >= chunkGroupReadState->rowCount)
|
||||
{
|
||||
Assert(chunkGroupReadState->currentRow == chunkGroupReadState->rowCount);
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize to all-NULL. Only non-NULL projected attributes will be set.
|
||||
*/
|
||||
memset(columnNulls, true, sizeof(bool) * chunkGroupReadState->columnCount);
|
||||
|
||||
Var *projectedColumn = NULL;
|
||||
foreach_ptr(projectedColumn, chunkGroupReadState->projectedColumnList)
|
||||
{
|
||||
const ChunkData *chunkGroupData = chunkGroupReadState->chunkGroupData;
|
||||
const int rowIndex = chunkGroupReadState->currentRow;
|
||||
uint32 columnIndex = projectedColumn->varattno - 1;
|
||||
|
||||
if (chunkGroupData->existsArray[columnIndex][rowIndex])
|
||||
{
|
||||
columnValues[columnIndex] = chunkGroupData->valueArray[columnIndex][rowIndex];
|
||||
columnNulls[columnIndex] = false;
|
||||
}
|
||||
}
|
||||
|
||||
chunkGroupReadState->currentRow++;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnarReadChunkGroupsFiltered
|
||||
*
|
||||
|
@ -459,36 +641,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadStripeNextRow reads the next row from the given stripe, finds the projected
|
||||
* column values within this row, and accordingly sets the column values and nulls.
|
||||
* Note that this function sets the values for all non-projected columns to null.
|
||||
*/
|
||||
static void
|
||||
ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
|
||||
uint64 chunkIndex, uint64 chunkRowIndex,
|
||||
ChunkData *chunkData, Datum *columnValues,
|
||||
bool *columnNulls)
|
||||
{
|
||||
ListCell *projectedColumnCell = NULL;
|
||||
|
||||
/* set all columns to null by default */
|
||||
memset(columnNulls, 1, stripeBuffers->columnCount * sizeof(bool));
|
||||
|
||||
foreach(projectedColumnCell, projectedColumnList)
|
||||
{
|
||||
Var *projectedColumn = lfirst(projectedColumnCell);
|
||||
uint32 columnIndex = projectedColumn->varattno - 1;
|
||||
|
||||
if (chunkData->existsArray[columnIndex][chunkRowIndex])
|
||||
{
|
||||
columnValues[columnIndex] = chunkData->valueArray[columnIndex][chunkRowIndex];
|
||||
columnNulls[columnIndex] = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LoadColumnBuffers reads serialized column data from the given file. These
|
||||
* column data are laid out as sequential chunks in the file; and chunk positions
|
||||
|
@ -954,7 +1106,7 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou
|
|||
|
||||
|
||||
/*
|
||||
* DeserializeChunkData deserializes requested data chunk for all columns and
|
||||
* DeserializeChunkGroupData deserializes requested data chunk for all columns and
|
||||
* stores in chunkDataArray. It uncompresses serialized data if necessary. The
|
||||
* function also deallocates data buffers used for previous chunk, and compressed
|
||||
* data buffers for the current chunk which will not be needed again. If a column
|
||||
|
|
Loading…
Reference in New Issue