diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index af62227a6..0ebb533c7 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -668,6 +668,46 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot) } +/* + * FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that + * has the row with highest rowNumber by doing backward index scan on + * stripe_first_row_number_idx. If given relation is empty, then returns NULL. + */ +StripeMetadata * +FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot) +{ + StripeMetadata *stripeWithHighestRowNumber = NULL; + + uint64 storageId = ColumnarStorageGetStorageId(relation, false); + ScanKeyData scanKey[1]; + ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid, + BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId)); + + Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock); + Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(), + AccessShareLock); + SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index, + snapshot, 1, scanKey); + + HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection); + if (HeapTupleIsValid(heapTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); + Datum datumArray[Natts_columnar_stripe]; + bool isNullArray[Natts_columnar_stripe]; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + stripeWithHighestRowNumber = BuildStripeMetadata(datumArray); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, AccessShareLock); + table_close(columnarStripes, AccessShareLock); + + return stripeWithHighestRowNumber; +} + + /* * ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the * given stripe. @@ -876,7 +916,8 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot) Oid columnarStripesOid = ColumnarStripeRelationId(); Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock); - Relation index = index_open(ColumnarStripePKeyIndexRelationId(), AccessShareLock); + Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(), + AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index, diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 3ed5d8187..89cb10ece 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -114,9 +114,15 @@ static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isn static ItemPointerData row_number_to_tid(uint64 rowNumber); static uint64 tid_to_row_number(ItemPointerData tid); static void ErrorIfInvalidRowNumber(uint64 rowNumber); +static void ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot, + int progressArrIndex); +static BlockNumber ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot); +static ItemPointerData ColumnarGetHighestItemPointer(Relation relation, + Snapshot snapshot); static double ColumnarReadRowsIntoIndex(TableScanDesc scan, Relation indexRelation, IndexInfo *indexInfo, + bool progress, IndexBuildCallback indexCallback, void *indexCallbackState, EState *estate, ExprState *predicate); @@ -1089,11 +1095,6 @@ columnar_index_build_range_scan(Relation columnarRelation, void *callback_state, TableScanDesc scan) { - /* - * TODO: Should this function call pgstat_progress_update_param in - * somewhere as heapam_index_build_range_scan ? - */ - if (start_blockno != 0 || numblocks != InvalidBlockNumber) { /* @@ -1183,6 +1184,12 @@ columnar_index_build_range_scan(Relation columnarRelation, snapshot = scan->rs_snapshot; } + if (progress) + { + ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot, + PROGRESS_SCAN_BLOCKS_TOTAL); + } + /* * Set up execution state for predicate, if any. * Note that this is only useful for partial indexes. @@ -1193,10 +1200,17 @@ columnar_index_build_range_scan(Relation columnarRelation, ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate); double reltuples = ColumnarReadRowsIntoIndex(scan, indexRelation, indexInfo, - callback, callback_state, estate, - predicate); + progress, callback, callback_state, + estate, predicate); table_endscan(scan); + if (progress) + { + /* report the last "virtual" block as "done" */ + ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot, + PROGRESS_SCAN_BLOCKS_DONE); + } + if (snapshotRegisteredByUs) { UnregisterSnapshot(snapshot); @@ -1211,6 +1225,81 @@ columnar_index_build_range_scan(Relation columnarRelation, } +/* + * ColumnarReportTotalVirtualBlocks reports progress for index build based on + * number of "virtual" blocks that given relation has. + * "progressArrIndex" argument determines which entry in st_progress_param + * array should be updated. In this case, we only expect PROGRESS_SCAN_BLOCKS_TOTAL + * or PROGRESS_SCAN_BLOCKS_DONE to specify whether we want to report calculated + * number of blocks as "done" or as "total" number of "virtual" blocks to scan. + */ +static void +ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot, + int progressArrIndex) +{ + /* + * Indeed, columnar tables might have gaps between row numbers, e.g + * due to aborted transactions etc. Also, ItemPointer BlockNumber's + * for columnar tables don't actually correspond to actual disk blocks + * as in heapAM. For this reason, we call them as "virtual" blocks. At + * the moment, we believe it is better to report our progress based on + * this "virtual" block concept instead of doing nothing. + */ + Assert(progressArrIndex == PROGRESS_SCAN_BLOCKS_TOTAL || + progressArrIndex == PROGRESS_SCAN_BLOCKS_DONE); + BlockNumber nvirtualBlocks = + ColumnarGetNumberOfVirtualBlocks(relation, snapshot); + pgstat_progress_update_param(progressArrIndex, nvirtualBlocks); +} + + +/* + * ColumnarGetNumberOfVirtualBlocks returns total number of "virtual" blocks + * that given columnar table has based on based on ItemPointer BlockNumber's. + */ +static BlockNumber +ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot) +{ + ItemPointerData highestItemPointer = + ColumnarGetHighestItemPointer(relation, snapshot); + if (!ItemPointerIsValid(&highestItemPointer)) + { + /* table is empty according to our snapshot */ + return 0; + } + + /* + * Since BlockNumber is 0-based, increment it by 1 to find the total + * number of "virtual" blocks. + */ + return ItemPointerGetBlockNumber(&highestItemPointer) + 1; +} + + +/* + * ColumnarGetHighestItemPointer returns ItemPointerData for the tuple with + * highest tid for given relation. + * If given relation is empty, then returns invalid item pointer. + */ +static ItemPointerData +ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot) +{ + StripeMetadata *stripeWithHighestRowNumber = + FindStripeWithHighestRowNumber(relation, snapshot); + if (stripeWithHighestRowNumber == NULL) + { + /* table is empty according to our snapshot */ + ItemPointerData invalidItemPtr; + ItemPointerSetInvalid(&invalidItemPtr); + return invalidItemPtr; + } + + uint64 highestRowNumber = stripeWithHighestRowNumber->firstRowNumber + + stripeWithHighestRowNumber->rowCount - 1; + return row_number_to_tid(highestRowNumber); +} + + /* * ColumnarReadRowsIntoIndex builds indexRelation tuples by reading the * actual relation based on given "scan" and returns number of tuples @@ -1218,17 +1307,36 @@ columnar_index_build_range_scan(Relation columnarRelation, */ static double ColumnarReadRowsIntoIndex(TableScanDesc scan, Relation indexRelation, - IndexInfo *indexInfo, IndexBuildCallback indexCallback, - void *indexCallbackState, EState *estate, ExprState *predicate) + IndexInfo *indexInfo, bool progress, + IndexBuildCallback indexCallback, + void *indexCallbackState, EState *estate, + ExprState *predicate) { double reltuples = 0; + BlockNumber lastReportedBlockNumber = InvalidBlockNumber; + ExprContext *econtext = GetPerTupleExprContext(estate); TupleTableSlot *slot = econtext->ecxt_scantuple; while (columnar_getnextslot(scan, ForwardScanDirection, slot)) { CHECK_FOR_INTERRUPTS(); + BlockNumber currentBlockNumber = ItemPointerGetBlockNumber(&slot->tts_tid); + if (progress && lastReportedBlockNumber != currentBlockNumber) + { + /* + * columnar_getnextslot guarantees that returned tuple will + * always have a greater ItemPointer than the ones we fetched + * before, so we directly use BlockNumber to report our progress. + */ + Assert(lastReportedBlockNumber == InvalidBlockNumber || + currentBlockNumber >= lastReportedBlockNumber); + pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, + currentBlockNumber); + lastReportedBlockNumber = currentBlockNumber; + } + MemoryContextReset(econtext->ecxt_per_tuple_memory); if (predicate != NULL && !ExecQual(predicate, econtext)) diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 8288a20aa..e4770acc2 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -256,6 +256,8 @@ extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 strip uint32 chunkCount); extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot); +extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, + Snapshot snapshot); extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);