mirror of https://github.com/citusdata/citus.git
Report progress when building index on columnar tables
parent
9b4dc2f804
commit
5adab2a3ac
|
@ -668,6 +668,46 @@ FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that
|
||||||
|
* has the row with highest rowNumber by doing backward index scan on
|
||||||
|
* stripe_first_row_number_idx. If given relation is empty, then returns NULL.
|
||||||
|
*/
|
||||||
|
StripeMetadata *
|
||||||
|
FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
|
||||||
|
{
|
||||||
|
StripeMetadata *stripeWithHighestRowNumber = NULL;
|
||||||
|
|
||||||
|
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
|
||||||
|
|
||||||
|
Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
|
||||||
|
Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
|
||||||
|
AccessShareLock);
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
|
||||||
|
snapshot, 1, scanKey);
|
||||||
|
|
||||||
|
HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
|
||||||
|
if (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||||
|
Datum datumArray[Natts_columnar_stripe];
|
||||||
|
bool isNullArray[Natts_columnar_stripe];
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
|
stripeWithHighestRowNumber = BuildStripeMetadata(datumArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan_ordered(scanDescriptor);
|
||||||
|
index_close(index, AccessShareLock);
|
||||||
|
table_close(columnarStripes, AccessShareLock);
|
||||||
|
|
||||||
|
return stripeWithHighestRowNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
|
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
|
||||||
* given stripe.
|
* given stripe.
|
||||||
|
@ -876,7 +916,8 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
||||||
Oid columnarStripesOid = ColumnarStripeRelationId();
|
Oid columnarStripesOid = ColumnarStripeRelationId();
|
||||||
|
|
||||||
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
|
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
|
||||||
Relation index = index_open(ColumnarStripePKeyIndexRelationId(), AccessShareLock);
|
Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
|
||||||
|
AccessShareLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
|
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
|
||||||
|
|
|
@ -114,9 +114,15 @@ static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isn
|
||||||
static ItemPointerData row_number_to_tid(uint64 rowNumber);
|
static ItemPointerData row_number_to_tid(uint64 rowNumber);
|
||||||
static uint64 tid_to_row_number(ItemPointerData tid);
|
static uint64 tid_to_row_number(ItemPointerData tid);
|
||||||
static void ErrorIfInvalidRowNumber(uint64 rowNumber);
|
static void ErrorIfInvalidRowNumber(uint64 rowNumber);
|
||||||
|
static void ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot,
|
||||||
|
int progressArrIndex);
|
||||||
|
static BlockNumber ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot);
|
||||||
|
static ItemPointerData ColumnarGetHighestItemPointer(Relation relation,
|
||||||
|
Snapshot snapshot);
|
||||||
static double ColumnarReadRowsIntoIndex(TableScanDesc scan,
|
static double ColumnarReadRowsIntoIndex(TableScanDesc scan,
|
||||||
Relation indexRelation,
|
Relation indexRelation,
|
||||||
IndexInfo *indexInfo,
|
IndexInfo *indexInfo,
|
||||||
|
bool progress,
|
||||||
IndexBuildCallback indexCallback,
|
IndexBuildCallback indexCallback,
|
||||||
void *indexCallbackState,
|
void *indexCallbackState,
|
||||||
EState *estate, ExprState *predicate);
|
EState *estate, ExprState *predicate);
|
||||||
|
@ -1089,11 +1095,6 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
void *callback_state,
|
void *callback_state,
|
||||||
TableScanDesc scan)
|
TableScanDesc scan)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* TODO: Should this function call pgstat_progress_update_param in
|
|
||||||
* somewhere as heapam_index_build_range_scan ?
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (start_blockno != 0 || numblocks != InvalidBlockNumber)
|
if (start_blockno != 0 || numblocks != InvalidBlockNumber)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -1183,6 +1184,12 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
snapshot = scan->rs_snapshot;
|
snapshot = scan->rs_snapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (progress)
|
||||||
|
{
|
||||||
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
||||||
|
PROGRESS_SCAN_BLOCKS_TOTAL);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set up execution state for predicate, if any.
|
* Set up execution state for predicate, if any.
|
||||||
* Note that this is only useful for partial indexes.
|
* Note that this is only useful for partial indexes.
|
||||||
|
@ -1193,10 +1200,17 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
|
ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
|
||||||
|
|
||||||
double reltuples = ColumnarReadRowsIntoIndex(scan, indexRelation, indexInfo,
|
double reltuples = ColumnarReadRowsIntoIndex(scan, indexRelation, indexInfo,
|
||||||
callback, callback_state, estate,
|
progress, callback, callback_state,
|
||||||
predicate);
|
estate, predicate);
|
||||||
table_endscan(scan);
|
table_endscan(scan);
|
||||||
|
|
||||||
|
if (progress)
|
||||||
|
{
|
||||||
|
/* report the last "virtual" block as "done" */
|
||||||
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
||||||
|
PROGRESS_SCAN_BLOCKS_DONE);
|
||||||
|
}
|
||||||
|
|
||||||
if (snapshotRegisteredByUs)
|
if (snapshotRegisteredByUs)
|
||||||
{
|
{
|
||||||
UnregisterSnapshot(snapshot);
|
UnregisterSnapshot(snapshot);
|
||||||
|
@ -1211,6 +1225,81 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarReportTotalVirtualBlocks reports progress for index build based on
|
||||||
|
* number of "virtual" blocks that given relation has.
|
||||||
|
* "progressArrIndex" argument determines which entry in st_progress_param
|
||||||
|
* array should be updated. In this case, we only expect PROGRESS_SCAN_BLOCKS_TOTAL
|
||||||
|
* or PROGRESS_SCAN_BLOCKS_DONE to specify whether we want to report calculated
|
||||||
|
* number of blocks as "done" or as "total" number of "virtual" blocks to scan.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot,
|
||||||
|
int progressArrIndex)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Indeed, columnar tables might have gaps between row numbers, e.g
|
||||||
|
* due to aborted transactions etc. Also, ItemPointer BlockNumber's
|
||||||
|
* for columnar tables don't actually correspond to actual disk blocks
|
||||||
|
* as in heapAM. For this reason, we call them as "virtual" blocks. At
|
||||||
|
* the moment, we believe it is better to report our progress based on
|
||||||
|
* this "virtual" block concept instead of doing nothing.
|
||||||
|
*/
|
||||||
|
Assert(progressArrIndex == PROGRESS_SCAN_BLOCKS_TOTAL ||
|
||||||
|
progressArrIndex == PROGRESS_SCAN_BLOCKS_DONE);
|
||||||
|
BlockNumber nvirtualBlocks =
|
||||||
|
ColumnarGetNumberOfVirtualBlocks(relation, snapshot);
|
||||||
|
pgstat_progress_update_param(progressArrIndex, nvirtualBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarGetNumberOfVirtualBlocks returns total number of "virtual" blocks
|
||||||
|
* that given columnar table has based on based on ItemPointer BlockNumber's.
|
||||||
|
*/
|
||||||
|
static BlockNumber
|
||||||
|
ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot)
|
||||||
|
{
|
||||||
|
ItemPointerData highestItemPointer =
|
||||||
|
ColumnarGetHighestItemPointer(relation, snapshot);
|
||||||
|
if (!ItemPointerIsValid(&highestItemPointer))
|
||||||
|
{
|
||||||
|
/* table is empty according to our snapshot */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Since BlockNumber is 0-based, increment it by 1 to find the total
|
||||||
|
* number of "virtual" blocks.
|
||||||
|
*/
|
||||||
|
return ItemPointerGetBlockNumber(&highestItemPointer) + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarGetHighestItemPointer returns ItemPointerData for the tuple with
|
||||||
|
* highest tid for given relation.
|
||||||
|
* If given relation is empty, then returns invalid item pointer.
|
||||||
|
*/
|
||||||
|
static ItemPointerData
|
||||||
|
ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot)
|
||||||
|
{
|
||||||
|
StripeMetadata *stripeWithHighestRowNumber =
|
||||||
|
FindStripeWithHighestRowNumber(relation, snapshot);
|
||||||
|
if (stripeWithHighestRowNumber == NULL)
|
||||||
|
{
|
||||||
|
/* table is empty according to our snapshot */
|
||||||
|
ItemPointerData invalidItemPtr;
|
||||||
|
ItemPointerSetInvalid(&invalidItemPtr);
|
||||||
|
return invalidItemPtr;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64 highestRowNumber = stripeWithHighestRowNumber->firstRowNumber +
|
||||||
|
stripeWithHighestRowNumber->rowCount - 1;
|
||||||
|
return row_number_to_tid(highestRowNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColumnarReadRowsIntoIndex builds indexRelation tuples by reading the
|
* ColumnarReadRowsIntoIndex builds indexRelation tuples by reading the
|
||||||
* actual relation based on given "scan" and returns number of tuples
|
* actual relation based on given "scan" and returns number of tuples
|
||||||
|
@ -1218,17 +1307,36 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
*/
|
*/
|
||||||
static double
|
static double
|
||||||
ColumnarReadRowsIntoIndex(TableScanDesc scan, Relation indexRelation,
|
ColumnarReadRowsIntoIndex(TableScanDesc scan, Relation indexRelation,
|
||||||
IndexInfo *indexInfo, IndexBuildCallback indexCallback,
|
IndexInfo *indexInfo, bool progress,
|
||||||
void *indexCallbackState, EState *estate, ExprState *predicate)
|
IndexBuildCallback indexCallback,
|
||||||
|
void *indexCallbackState, EState *estate,
|
||||||
|
ExprState *predicate)
|
||||||
{
|
{
|
||||||
double reltuples = 0;
|
double reltuples = 0;
|
||||||
|
|
||||||
|
BlockNumber lastReportedBlockNumber = InvalidBlockNumber;
|
||||||
|
|
||||||
ExprContext *econtext = GetPerTupleExprContext(estate);
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
||||||
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
||||||
while (columnar_getnextslot(scan, ForwardScanDirection, slot))
|
while (columnar_getnextslot(scan, ForwardScanDirection, slot))
|
||||||
{
|
{
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
BlockNumber currentBlockNumber = ItemPointerGetBlockNumber(&slot->tts_tid);
|
||||||
|
if (progress && lastReportedBlockNumber != currentBlockNumber)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* columnar_getnextslot guarantees that returned tuple will
|
||||||
|
* always have a greater ItemPointer than the ones we fetched
|
||||||
|
* before, so we directly use BlockNumber to report our progress.
|
||||||
|
*/
|
||||||
|
Assert(lastReportedBlockNumber == InvalidBlockNumber ||
|
||||||
|
currentBlockNumber >= lastReportedBlockNumber);
|
||||||
|
pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
|
||||||
|
currentBlockNumber);
|
||||||
|
lastReportedBlockNumber = currentBlockNumber;
|
||||||
|
}
|
||||||
|
|
||||||
MemoryContextReset(econtext->ecxt_per_tuple_memory);
|
MemoryContextReset(econtext->ecxt_per_tuple_memory);
|
||||||
|
|
||||||
if (predicate != NULL && !ExecQual(predicate, econtext))
|
if (predicate != NULL && !ExecQual(predicate, econtext))
|
||||||
|
|
|
@ -256,6 +256,8 @@ extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 strip
|
||||||
uint32 chunkCount);
|
uint32 chunkCount);
|
||||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||||
Snapshot snapshot);
|
Snapshot snapshot);
|
||||||
|
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
|
||||||
|
Snapshot snapshot);
|
||||||
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue