diff --git a/src/backend/columnar/columnar_metadata_tables.c b/src/backend/columnar/columnar_metadata_tables.c index 9368d66ad..1c8a73335 100644 --- a/src/backend/columnar/columnar_metadata_tables.c +++ b/src/backend/columnar/columnar_metadata_tables.c @@ -83,6 +83,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId, static void LockForStripeReservation(Relation rel, LOCKMODE mode); static void UnlockForStripeReservation(Relation rel, LOCKMODE mode); static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); +static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 + chunkGroupCount); static Oid ColumnarStorageIdSequenceRelationId(void); static Oid ColumnarStripeRelationId(void); static Oid ColumnarStripeIndexRelationId(void); @@ -91,6 +93,7 @@ static Oid ColumnarOptionsIndexRegclass(void); static Oid ColumnarChunkRelationId(void); static Oid ColumnarChunkGroupRelationId(void); static Oid ColumnarChunkIndexRelationId(void); +static Oid ColumnarChunkGroupIndexRelationId(void); static Oid ColumnarNamespaceId(void); static ModifyState * StartModifyRelation(Relation rel); static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, @@ -616,10 +619,72 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri index_close(index, AccessShareLock); table_close(columnarChunk, AccessShareLock); + chunkList->chunkGroupRowCounts = + ReadChunkGroupRowCounts(metapage->storageId, stripe, chunkCount); + return chunkList; } +/* + * ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the + * given stripe. + */ +static uint32 * +ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount) +{ + Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId(); + Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock); + Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock); + + ScanKeyData scanKey[2]; + ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid, + BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId)); + ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); + + SysScanDesc scanDescriptor = + systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey); + + uint32 chunkGroupIndex = 0; + HeapTuple heapTuple = NULL; + uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32)); + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum datumArray[Natts_columnar_chunkgroup]; + bool isNullArray[Natts_columnar_chunkgroup]; + + heap_deform_tuple(heapTuple, + RelationGetDescr(columnarChunkGroup), + datumArray, isNullArray); + + uint32 tupleChunkGroupIndex = + DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]); + if (chunkGroupIndex >= chunkGroupCount || + tupleChunkGroupIndex != chunkGroupIndex) + { + elog(ERROR, "unexpected chunk group"); + } + + chunkGroupRowCounts[chunkGroupIndex] = + (uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]); + chunkGroupIndex++; + } + + if (chunkGroupIndex != chunkGroupCount) + { + elog(ERROR, "unexpected chunk group count"); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, AccessShareLock); + table_close(columnarChunkGroup, AccessShareLock); + + return chunkGroupRowCounts; +} + + /* * InsertStripeMetadataRow adds a row to columnar.stripe. */ @@ -633,7 +698,7 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe) Int64GetDatum(stripe->fileOffset), Int64GetDatum(stripe->dataLength), Int32GetDatum(stripe->columnCount), - Int32GetDatum(stripe->chunkRowCount), + Int32GetDatum(stripe->chunkGroupRowCount), Int64GetDatum(stripe->rowCount), Int32GetDatum(stripe->chunkCount) }; @@ -767,7 +832,7 @@ UnlockForStripeReservation(Relation rel, LOCKMODE mode) StripeMetadata ReserveStripe(Relation rel, uint64 sizeBytes, uint64 rowCount, uint64 columnCount, - uint64 chunkCount, uint64 chunkRowCount) + uint64 chunkCount, uint64 chunkGroupRowCount) { StripeMetadata stripe = { 0 }; uint64 currLogicalHigh = 0; @@ -815,7 +880,7 @@ ReserveStripe(Relation rel, uint64 sizeBytes, stripe.fileOffset = resLogicalStart; stripe.dataLength = sizeBytes; stripe.chunkCount = chunkCount; - stripe.chunkRowCount = chunkRowCount; + stripe.chunkGroupRowCount = chunkGroupRowCount; stripe.columnCount = columnCount; stripe.rowCount = rowCount; stripe.id = highestId + 1; @@ -869,7 +934,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot) datumArray[Anum_columnar_stripe_column_count - 1]); stripeMetadata->chunkCount = DatumGetInt32( datumArray[Anum_columnar_stripe_chunk_count - 1]); - stripeMetadata->chunkRowCount = DatumGetInt32( + stripeMetadata->chunkGroupRowCount = DatumGetInt32( datumArray[Anum_columnar_stripe_chunk_row_count - 1]); stripeMetadata->rowCount = DatumGetInt64( datumArray[Anum_columnar_stripe_row_count - 1]); @@ -1214,6 +1279,17 @@ ColumnarChunkIndexRelationId(void) } +/* + * ColumnarChunkGroupIndexRelationId returns relation id of columnar.chunk_group_pkey. + * TODO: should we cache this similar to citus? + */ +static Oid +ColumnarChunkGroupIndexRelationId(void) +{ + return get_relname_relid("chunk_group_pkey", ColumnarNamespaceId()); +} + + /* * ColumnarNamespaceId returns namespace id of the schema we store columnar * related tables. diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index a5c85ac35..2c15f0dd1 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -73,13 +73,6 @@ struct ColumnarReadState int64 currentStripe; /* index of current stripe */ StripeReadState *stripeReadState; - /* - * Following are used for tables with zero columns, or when no - * columns are projected. - */ - uint64 totalRowCount; - uint64 readRowCount; - /* * List of Var pointers for columns in the query. We use this both for * getting vector of projected columns, and also when we want to build @@ -277,39 +270,15 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes stripeReadState->projectedColumnList = projectedColumnList; stripeReadState->stripeReadContext = stripeReadContext; - /* - * If there are no attributes in the table at all, reading the chunk - * groups will fail (because there are no chunks), so we must introduce a - * special case. Also follow this special case if no attributes are - * projected, so that we won't have to deal with deleted attributes, - * either. - * - * TODO: refactor metadata so that chunk groups hold the row count; rather - * than individual chunks (which is repetitive in the normal case, and - * problematic in the case where there are zero columns). - */ - if (list_length(projectedColumnList) != 0) - { - stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, - stripeMetadata, - tupleDesc, - projectedColumnList, - whereClauseList, - &stripeReadState-> - chunkGroupsFiltered); + stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel, + stripeMetadata, + tupleDesc, + projectedColumnList, + whereClauseList, + &stripeReadState-> + chunkGroupsFiltered); - stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; - } - else - { - stripeReadState->stripeBuffers = NULL; - - /* - * If there are no projected columns, then no chunks will be filtered, - * so the row count is simply the stripe's row count. - */ - stripeReadState->rowCount = stripeMetadata->rowCount; - } + stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; MemoryContextSwitchTo(oldContext); @@ -346,23 +315,6 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, return false; } - /* - * If there are no attributes in the table at all, stripeBuffers won't be - * loaded so we just return rowCount empty tuples. - */ - if (stripeReadState->stripeBuffers == NULL) - { - if (stripeReadState->currentRow < stripeReadState->rowCount) - { - stripeReadState->currentRow++; - return true; - } - else - { - return false; - } - } - while (true) { if (stripeReadState->chunkGroupReadState == NULL) @@ -405,19 +357,21 @@ static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc, List *projectedColumnList, MemoryContext cxt) { - uint32 chunkRowCount = stripeBuffers->selectedChunkRowCount[chunkIndex]; + uint32 chunkGroupRowCount = + stripeBuffers->selectedChunkGroupRowCounts[chunkIndex]; MemoryContext oldContext = MemoryContextSwitchTo(cxt); ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState)); chunkGroupReadState->currentRow = 0; - chunkGroupReadState->rowCount = chunkRowCount; + chunkGroupReadState->rowCount = chunkGroupRowCount; chunkGroupReadState->columnCount = tupleDesc->natts; chunkGroupReadState->projectedColumnList = projectedColumnList; chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex, - chunkRowCount, tupleDesc, + chunkGroupRowCount, + tupleDesc, projectedColumnList); MemoryContextSwitchTo(oldContext); @@ -495,7 +449,7 @@ ColumnarReadChunkGroupsFiltered(ColumnarReadState *state) * value arrays for requested columns in columnMask. */ ChunkData * -CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkRowCount) +CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowCount) { uint32 columnIndex = 0; @@ -504,15 +458,17 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkRowCount) chunkData->valueArray = palloc0(columnCount * sizeof(Datum *)); chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo)); chunkData->columnCount = columnCount; - chunkData->rowCount = chunkRowCount; + chunkData->rowCount = chunkGroupRowCount; /* allocate chunk memory for deserialized data */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { if (columnMask[columnIndex]) { - chunkData->existsArray[columnIndex] = palloc0(chunkRowCount * sizeof(bool)); - chunkData->valueArray[columnIndex] = palloc0(chunkRowCount * sizeof(Datum)); + chunkData->existsArray[columnIndex] = palloc0(chunkGroupRowCount * + sizeof(bool)); + chunkData->valueArray[columnIndex] = palloc0(chunkGroupRowCount * + sizeof(Datum)); chunkData->valueBufferArray[columnIndex] = NULL; } } @@ -601,14 +557,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, SelectedChunkSkipList(stripeSkipList, projectedColumnMask, selectedChunkMask); - uint32 selectedChunkCount = selectedChunkSkipList->chunkCount; - uint32 *selectedChunkRowCount = palloc0(selectedChunkCount * sizeof(uint32)); - for (int chunkIndex = 0; chunkIndex < selectedChunkCount; chunkIndex++) - { - selectedChunkRowCount[chunkIndex] = - selectedChunkSkipList->chunkSkipNodeArray[0][chunkIndex].rowCount; - } - /* load column data for projected columns */ ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *)); @@ -634,8 +582,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, stripeBuffers->columnCount = columnCount; stripeBuffers->rowCount = StripeSkipListRowCount(selectedChunkSkipList); stripeBuffers->columnBuffersArray = columnBuffersArray; - stripeBuffers->selectedChunks = selectedChunkCount; - stripeBuffers->selectedChunkRowCount = selectedChunkRowCount; + stripeBuffers->selectedChunkGroupRowCounts = + selectedChunkSkipList->chunkGroupRowCounts; return stripeBuffers; } @@ -939,6 +887,7 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask, uint32 chunkIndex = 0; uint32 columnIndex = 0; uint32 columnCount = stripeSkipList->columnCount; + uint32 selectedChunkIndex = 0; for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++) { @@ -948,13 +897,13 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask, } } - ColumnChunkSkipNode **selectedChunkSkipNodeArray = palloc0(columnCount * - sizeof(ColumnChunkSkipNode - *)); + ColumnChunkSkipNode **selectedChunkSkipNodeArray = + palloc0(columnCount * sizeof(ColumnChunkSkipNode *)); + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - uint32 selectedChunkIndex = 0; bool firstColumn = columnIndex == 0; + selectedChunkIndex = 0; /* first column's chunk skip node is always read */ if (!projectedColumnMask[columnIndex] && !firstColumn) @@ -979,12 +928,24 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask, } } - StripeSkipList *SelectedChunkSkipList = palloc0(sizeof(StripeSkipList)); - SelectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray; - SelectedChunkSkipList->chunkCount = selectedChunkCount; - SelectedChunkSkipList->columnCount = stripeSkipList->columnCount; + selectedChunkIndex = 0; + uint32 *chunkGroupRowCounts = palloc0(selectedChunkCount * sizeof(uint32)); + for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++) + { + if (selectedChunkMask[chunkIndex]) + { + chunkGroupRowCounts[selectedChunkIndex++] = + stripeSkipList->chunkGroupRowCounts[chunkIndex]; + } + } - return SelectedChunkSkipList; + StripeSkipList *selectedChunkSkipList = palloc0(sizeof(StripeSkipList)); + selectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray; + selectedChunkSkipList->chunkCount = selectedChunkCount; + selectedChunkSkipList->columnCount = stripeSkipList->columnCount; + selectedChunkSkipList->chunkGroupRowCounts = chunkGroupRowCounts; + + return selectedChunkSkipList; } @@ -998,13 +959,12 @@ StripeSkipListRowCount(StripeSkipList *stripeSkipList) { uint32 stripeSkipListRowCount = 0; uint32 chunkIndex = 0; - ColumnChunkSkipNode *firstColumnSkipNodeArray = - stripeSkipList->chunkSkipNodeArray[0]; + uint32 *chunkGroupRowCounts = stripeSkipList->chunkGroupRowCounts; for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++) { - uint32 chunkRowCount = firstColumnSkipNodeArray[chunkIndex].rowCount; - stripeSkipListRowCount += chunkRowCount; + uint32 chunkGroupRowCount = chunkGroupRowCounts[chunkIndex]; + stripeSkipListRowCount += chunkGroupRowCount; } return stripeSkipListRowCount; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 0c5672cce..bac092e2a 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -96,7 +96,7 @@ typedef struct StripeMetadata uint64 dataLength; uint32 columnCount; uint32 chunkCount; - uint32 chunkRowCount; + uint32 chunkGroupRowCount; uint64 rowCount; uint64 id; } StripeMetadata; @@ -140,6 +140,7 @@ typedef struct ColumnChunkSkipNode typedef struct StripeSkipList { ColumnChunkSkipNode **chunkSkipNodeArray; + uint32 *chunkGroupRowCounts; uint32 columnCount; uint32 chunkCount; } StripeSkipList; @@ -202,13 +203,7 @@ typedef struct StripeBuffers uint32 rowCount; ColumnBuffers **columnBuffersArray; - /* - * We might skip reading some chunks because they're refuted by the - * WHERE clause. We keep number of selected chunks and number of rows - * in each of them. - */ - uint32 selectedChunks; - uint32 *selectedChunkRowCount; + uint32 *selectedChunkGroupRowCounts; } StripeBuffers; @@ -256,7 +251,7 @@ extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state); extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId); extern ChunkData * CreateEmptyChunkData(uint32 columnCount, bool *columnMask, - uint32 chunkRowCount); + uint32 chunkGroupRowCount); extern void FreeChunkData(ChunkData *chunkData); extern uint64 ColumnarTableRowCount(Relation relation); extern bool CompressBuffer(StringInfo inputBuffer, @@ -283,7 +278,7 @@ extern List * StripesForRelfilenode(RelFileNode relfilenode); extern uint64 GetHighestUsedAddress(RelFileNode relfilenode); extern StripeMetadata ReserveStripe(Relation rel, uint64 size, uint64 rowCount, uint64 columnCount, - uint64 chunkCount, uint64 chunkRowCount); + uint64 chunkCount, uint64 chunkGroupRowCount); extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor);