diff --git a/src/backend/columnar/cstore_reader.c b/src/backend/columnar/cstore_reader.c index 71e8fb845..3ee410229 100644 --- a/src/backend/columnar/cstore_reader.c +++ b/src/backend/columnar/cstore_reader.c @@ -40,13 +40,39 @@ #include "columnar/columnar.h" #include "columnar/columnar_version_compat.h" +typedef struct ChunkGroupReadState +{ + int64 currentRow; + int64 rowCount; + int columnCount; + List *projectedColumnList; /* borrowed reference */ + ChunkData *chunkGroupData; +} ChunkGroupReadState; + +typedef struct StripeReadState +{ + int columnCount; + int64 rowCount; + int64 currentRow; + TupleDesc tupleDescriptor; + Relation relation; + int chunkGroupIndex; + int64 chunkGroupsFiltered; + MemoryContext stripeReadContext; + StripeBuffers *stripeBuffers; /* allocated in stripeReadContext */ + List *projectedColumnList; /* borrowed reference */ + ChunkGroupReadState *chunkGroupReadState; /* owned */ +} StripeReadState; + struct TableReadState { List *stripeList; - StripeMetadata *currentStripeMetadata; TupleDesc tupleDescriptor; Relation relation; + int64 currentStripe; /* index of current stripe */ + StripeReadState *stripeReadState; + /* * Following are used for tables with zero columns, or when no * columns are projected. @@ -63,25 +89,32 @@ struct TableReadState List *whereClauseList; MemoryContext stripeReadContext; - StripeBuffers *stripeBuffers; - uint32 readStripeCount; - uint64 stripeReadRowCount; int64 chunkGroupsFiltered; - ChunkData *chunkData; - int32 deserializedChunkIndex; }; /* static function declarations */ +static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, + TupleDesc tupleDesc, List *projectedColumnList, + List *whereClauseList, MemoryContext + stripeReadContext); +static void EndStripeRead(StripeReadState *stripeReadState); +static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, + bool *columnNulls); +static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int + chunkIndex, + TupleDesc tupleDesc, + List *projectedColumnList, + MemoryContext cxt); +static void EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState); +static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, + Datum *columnValues, + bool *columnNulls); static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, int64 *chunkGroupsFiltered); -static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, - uint64 chunkIndex, uint64 chunkRowIndex, - ChunkData *chunkData, Datum *columnValues, - bool *columnNulls); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray, uint32 chunkCount, uint64 stripeOffset, @@ -142,16 +175,10 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, readState->stripeList = stripeList; readState->projectedColumnList = projectedColumnList; readState->whereClauseList = whereClauseList; - readState->stripeBuffers = NULL; - readState->readStripeCount = 0; - readState->stripeReadRowCount = 0; readState->chunkGroupsFiltered = 0; readState->tupleDescriptor = tupleDescriptor; readState->stripeReadContext = stripeReadContext; - readState->chunkData = NULL; - readState->deserializedChunkIndex = -1; - readState->readRowCount = 0; - readState->totalRowCount = totalRowCount; + readState->stripeReadState = NULL; return readState; } @@ -165,111 +192,44 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, bool ColumnarReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNulls) { - StripeMetadata *stripeMetadata = readState->currentStripeMetadata; - MemoryContext oldContext = NULL; - - /* - * We rely on first column's metadata in rest of this function. So for zero - * column tables we just return "true" for totalRowCount times. We do the - * same when no columns are projected. - */ - if (readState->projectedColumnList == NIL) + while (true) { - if (readState->totalRowCount == readState->readRowCount) + if (readState->stripeReadState == NULL) { - return false; - } - else - { - int columnCount = readState->tupleDescriptor->natts; - memset(columnNulls, 1, sizeof(bool) * columnCount); - readState->readRowCount++; - return true; - } - } + uint32 stripeCount = list_length(readState->stripeList); - /* - * If no stripes are loaded, load the next non-empty stripe. Note that when - * loading stripes, we skip over chunks whose contents can be filtered with - * the query's restriction qualifiers. So, even when a stripe is physically - * not empty, we may end up loading it as an empty stripe. - */ - while (readState->stripeBuffers == NULL) - { - List *stripeMetadataList = readState->stripeList; - uint32 stripeCount = list_length(stripeMetadataList); + if (readState->currentStripe >= stripeCount) + { + return false; + } - /* if we have read all stripes, return false */ - if (readState->readStripeCount == stripeCount) - { - return false; + StripeMetadata *stripeMetadata = list_nth(readState->stripeList, + readState->currentStripe); + + readState->stripeReadState = BeginStripeRead(stripeMetadata, + readState->relation, + readState->tupleDescriptor, + readState->projectedColumnList, + readState->whereClauseList, + readState->stripeReadContext); } - oldContext = MemoryContextSwitchTo(readState->stripeReadContext); - MemoryContextReset(readState->stripeReadContext); - readState->chunkData = NULL; - - stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount); - StripeBuffers *stripeBuffers = LoadFilteredStripeBuffers(readState->relation, - stripeMetadata, - readState-> - tupleDescriptor, - readState-> - projectedColumnList, - readState-> - whereClauseList, - &readState-> - chunkGroupsFiltered); - readState->readStripeCount++; - readState->currentStripeMetadata = stripeMetadata; - - MemoryContextSwitchTo(oldContext); - - if (stripeBuffers->rowCount != 0) + if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls)) { - readState->stripeBuffers = stripeBuffers; - readState->stripeReadRowCount = 0; - readState->deserializedChunkIndex = -1; - break; + readState->chunkGroupsFiltered += + readState->stripeReadState->chunkGroupsFiltered; + readState->currentStripe++; + EndStripeRead(readState->stripeReadState); + readState->stripeReadState = NULL; + MemoryContextReset(readState->stripeReadContext); + + continue; } + + return true; } - uint32 chunkIndex = readState->stripeReadRowCount / stripeMetadata->chunkRowCount; - uint32 chunkRowIndex = readState->stripeReadRowCount % stripeMetadata->chunkRowCount; - - if (chunkIndex != readState->deserializedChunkIndex) - { - uint32 chunkRowCount = - readState->stripeBuffers->selectedChunkRowCount[chunkIndex]; - - oldContext = MemoryContextSwitchTo(readState->stripeReadContext); - - FreeChunkData(readState->chunkData); - readState->chunkData = - DeserializeChunkData(readState->stripeBuffers, chunkIndex, - chunkRowCount, readState->tupleDescriptor, - readState->projectedColumnList); - - MemoryContextSwitchTo(oldContext); - - readState->deserializedChunkIndex = chunkIndex; - } - - ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList, - chunkIndex, chunkRowIndex, readState->chunkData, - columnValues, columnNulls); - - /* - * If we finished reading the current stripe, set stripe data to NULL. That - * way, we will load a new stripe the next time this function gets called. - */ - readState->stripeReadRowCount++; - if (readState->stripeReadRowCount == readState->stripeBuffers->rowCount) - { - readState->stripeBuffers = NULL; - } - - return true; + return false; } @@ -280,13 +240,15 @@ ColumnarReadNextRow(TableReadState *readState, Datum *columnValues, bool *column void ColumnarRescan(TableReadState *readState) { - readState->stripeBuffers = NULL; - readState->readStripeCount = 0; - readState->stripeReadRowCount = 0; + readState->stripeReadState = NULL; + readState->currentStripe = 0; + readState->chunkGroupsFiltered = 0; } -/* Finishes a columnar read operation. */ +/* + * Finishes a columnar read operation. + */ void ColumnarEndRead(TableReadState *readState) { @@ -296,6 +258,226 @@ ColumnarEndRead(TableReadState *readState) } +/* + * BeginStripeRead allocates state for reading a stripe. + */ +static StripeReadState * +BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, + List *projectedColumnList, List *whereClauseList, MemoryContext + stripeReadContext) +{ + MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); + + StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState)); + + stripeReadState->relation = rel; + stripeReadState->tupleDescriptor = tupleDesc; + stripeReadState->columnCount = tupleDesc->natts; + stripeReadState->chunkGroupReadState = NULL; + stripeReadState->projectedColumnList = projectedColumnList; + stripeReadState->stripeReadContext = stripeReadContext; + + /* + * If there are no attributes in the table at all, reading the chunk + * groups will fail (because there are no chunks), so we must introduce a + * special case. Also follow this special case if no attributes are + * projected, so that we won't have to deal with deleted attributes, + * either. + * + * TODO: refactor metadata so that chunk groups hold the row count; rather + * than individual chunks (which is repetitive in the normal case, and + * problematic in the case where there are zero columns). + */ + if (list_length(projectedColumnList) != 0) + { + stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, + stripeMetadata, + tupleDesc, + projectedColumnList, + whereClauseList, + &stripeReadState-> + chunkGroupsFiltered); + + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; + } + else + { + stripeReadState->stripeBuffers = NULL; + + /* + * If there are no projected columns, then no chunks will be filtered, + * so the row count is simply the stripe's row count. + */ + stripeReadState->rowCount = stripeMetadata->rowCount; + } + + MemoryContextSwitchTo(oldContext); + + + return stripeReadState; +} + + +/* + * EndStripeRead finishes a stripe read. + */ +static void +EndStripeRead(StripeReadState *stripeReadState) +{ + pfree(stripeReadState); +} + + +/* + * ReadStripeNextRow: If more rows can be read from the current stripe, fill + * in non-NULL columnValues and return true. Otherwise, return false. + * + * On entry, all entries in columnNulls should be true; this function only + * sets non-NULL entries. + * + */ +static bool +ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, + bool *columnNulls) +{ + if (stripeReadState->currentRow >= stripeReadState->rowCount) + { + Assert(stripeReadState->currentRow == stripeReadState->rowCount); + return false; + } + + /* + * If there are no attributes in the table at all, stripeBuffers won't be + * loaded so we just return rowCount empty tuples. + */ + if (stripeReadState->stripeBuffers == NULL) + { + if (stripeReadState->currentRow < stripeReadState->rowCount) + { + stripeReadState->currentRow++; + return true; + } + else + { + return false; + } + } + + while (true) + { + if (stripeReadState->chunkGroupReadState == NULL) + { + stripeReadState->chunkGroupReadState = BeginChunkGroupRead( + stripeReadState->stripeBuffers, + stripeReadState-> + chunkGroupIndex, + stripeReadState-> + tupleDescriptor, + stripeReadState-> + projectedColumnList, + stripeReadState-> + stripeReadContext); + } + + if (!ReadChunkGroupNextRow(stripeReadState->chunkGroupReadState, columnValues, + columnNulls)) + { + /* if this chunk group is exhausted, fetch the next one and loop */ + EndChunkGroupRead(stripeReadState->chunkGroupReadState); + stripeReadState->chunkGroupReadState = NULL; + stripeReadState->chunkGroupIndex++; + continue; + } + + stripeReadState->currentRow++; + return true; + } + + Assert(stripeReadState->currentRow == stripeReadState->rowCount); + return false; +} + + +/* + * BeginChunkGroupRead allocates state for reading a chunk. + */ +static ChunkGroupReadState * +BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc, + List *projectedColumnList, MemoryContext cxt) +{ + uint32 chunkRowCount = stripeBuffers->selectedChunkRowCount[chunkIndex]; + + MemoryContext oldContext = MemoryContextSwitchTo(cxt); + + ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState)); + + chunkGroupReadState->currentRow = 0; + chunkGroupReadState->rowCount = chunkRowCount; + chunkGroupReadState->columnCount = tupleDesc->natts; + chunkGroupReadState->projectedColumnList = projectedColumnList; + + chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex, + chunkRowCount, tupleDesc, + projectedColumnList); + MemoryContextSwitchTo(oldContext); + + return chunkGroupReadState; +} + + +/* + * EndChunkRead finishes a chunk read. + */ +static void +EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState) +{ + FreeChunkData(chunkGroupReadState->chunkGroupData); + pfree(chunkGroupReadState); +} + + +/* + * ReadChunkGroupNextRow: if more rows can be read from the current chunk + * group, fill in non-NULL columnValues and return true. Otherwise, return + * false. + * + * On entry, all entries in columnNulls should be true; this function only + * sets non-NULL entries. + */ +static bool +ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, Datum *columnValues, + bool *columnNulls) +{ + if (chunkGroupReadState->currentRow >= chunkGroupReadState->rowCount) + { + Assert(chunkGroupReadState->currentRow == chunkGroupReadState->rowCount); + return false; + } + + /* + * Initialize to all-NULL. Only non-NULL projected attributes will be set. + */ + memset(columnNulls, true, sizeof(bool) * chunkGroupReadState->columnCount); + + Var *projectedColumn = NULL; + foreach_ptr(projectedColumn, chunkGroupReadState->projectedColumnList) + { + const ChunkData *chunkGroupData = chunkGroupReadState->chunkGroupData; + const int rowIndex = chunkGroupReadState->currentRow; + uint32 columnIndex = projectedColumn->varattno - 1; + + if (chunkGroupData->existsArray[columnIndex][rowIndex]) + { + columnValues[columnIndex] = chunkGroupData->valueArray[columnIndex][rowIndex]; + columnNulls[columnIndex] = false; + } + } + + chunkGroupReadState->currentRow++; + return true; +} + + /* * ColumnarReadChunkGroupsFiltered * @@ -459,36 +641,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, } -/* - * ReadStripeNextRow reads the next row from the given stripe, finds the projected - * column values within this row, and accordingly sets the column values and nulls. - * Note that this function sets the values for all non-projected columns to null. - */ -static void -ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList, - uint64 chunkIndex, uint64 chunkRowIndex, - ChunkData *chunkData, Datum *columnValues, - bool *columnNulls) -{ - ListCell *projectedColumnCell = NULL; - - /* set all columns to null by default */ - memset(columnNulls, 1, stripeBuffers->columnCount * sizeof(bool)); - - foreach(projectedColumnCell, projectedColumnList) - { - Var *projectedColumn = lfirst(projectedColumnCell); - uint32 columnIndex = projectedColumn->varattno - 1; - - if (chunkData->existsArray[columnIndex][chunkRowIndex]) - { - columnValues[columnIndex] = chunkData->valueArray[columnIndex][chunkRowIndex]; - columnNulls[columnIndex] = false; - } - } -} - - /* * LoadColumnBuffers reads serialized column data from the given file. These * column data are laid out as sequential chunks in the file; and chunk positions @@ -954,7 +1106,7 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou /* - * DeserializeChunkData deserializes requested data chunk for all columns and + * DeserializeChunkGroupData deserializes requested data chunk for all columns and * stores in chunkDataArray. It uncompresses serialized data if necessary. The * function also deallocates data buffers used for previous chunk, and compressed * data buffers for the current chunk which will not be needed again. If a column