mirror of https://github.com/citusdata/citus.git
Use chunk groups to read columnar data (#4768)
parent
2f30614fe3
commit
1a05131331
|
@ -83,6 +83,8 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
|
|||
static void LockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||
static void UnlockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||
chunkGroupCount);
|
||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||
static Oid ColumnarStripeRelationId(void);
|
||||
static Oid ColumnarStripeIndexRelationId(void);
|
||||
|
@ -91,6 +93,7 @@ static Oid ColumnarOptionsIndexRegclass(void);
|
|||
static Oid ColumnarChunkRelationId(void);
|
||||
static Oid ColumnarChunkGroupRelationId(void);
|
||||
static Oid ColumnarChunkIndexRelationId(void);
|
||||
static Oid ColumnarChunkGroupIndexRelationId(void);
|
||||
static Oid ColumnarNamespaceId(void);
|
||||
static ModifyState * StartModifyRelation(Relation rel);
|
||||
static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
|
||||
|
@ -616,10 +619,72 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
|||
index_close(index, AccessShareLock);
|
||||
table_close(columnarChunk, AccessShareLock);
|
||||
|
||||
chunkList->chunkGroupRowCounts =
|
||||
ReadChunkGroupRowCounts(metapage->storageId, stripe, chunkCount);
|
||||
|
||||
return chunkList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
|
||||
* given stripe.
|
||||
*/
|
||||
static uint32 *
|
||||
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
||||
{
|
||||
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
||||
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
|
||||
Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock);
|
||||
|
||||
ScanKeyData scanKey[2];
|
||||
ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
|
||||
ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey);
|
||||
|
||||
uint32 chunkGroupIndex = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));
|
||||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
Datum datumArray[Natts_columnar_chunkgroup];
|
||||
bool isNullArray[Natts_columnar_chunkgroup];
|
||||
|
||||
heap_deform_tuple(heapTuple,
|
||||
RelationGetDescr(columnarChunkGroup),
|
||||
datumArray, isNullArray);
|
||||
|
||||
uint32 tupleChunkGroupIndex =
|
||||
DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]);
|
||||
if (chunkGroupIndex >= chunkGroupCount ||
|
||||
tupleChunkGroupIndex != chunkGroupIndex)
|
||||
{
|
||||
elog(ERROR, "unexpected chunk group");
|
||||
}
|
||||
|
||||
chunkGroupRowCounts[chunkGroupIndex] =
|
||||
(uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]);
|
||||
chunkGroupIndex++;
|
||||
}
|
||||
|
||||
if (chunkGroupIndex != chunkGroupCount)
|
||||
{
|
||||
elog(ERROR, "unexpected chunk group count");
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(index, AccessShareLock);
|
||||
table_close(columnarChunkGroup, AccessShareLock);
|
||||
|
||||
return chunkGroupRowCounts;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertStripeMetadataRow adds a row to columnar.stripe.
|
||||
*/
|
||||
|
@ -633,7 +698,7 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
|
|||
Int64GetDatum(stripe->fileOffset),
|
||||
Int64GetDatum(stripe->dataLength),
|
||||
Int32GetDatum(stripe->columnCount),
|
||||
Int32GetDatum(stripe->chunkRowCount),
|
||||
Int32GetDatum(stripe->chunkGroupRowCount),
|
||||
Int64GetDatum(stripe->rowCount),
|
||||
Int32GetDatum(stripe->chunkCount)
|
||||
};
|
||||
|
@ -767,7 +832,7 @@ UnlockForStripeReservation(Relation rel, LOCKMODE mode)
|
|||
StripeMetadata
|
||||
ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 chunkCount, uint64 chunkRowCount)
|
||||
uint64 chunkCount, uint64 chunkGroupRowCount)
|
||||
{
|
||||
StripeMetadata stripe = { 0 };
|
||||
uint64 currLogicalHigh = 0;
|
||||
|
@ -815,7 +880,7 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
|||
stripe.fileOffset = resLogicalStart;
|
||||
stripe.dataLength = sizeBytes;
|
||||
stripe.chunkCount = chunkCount;
|
||||
stripe.chunkRowCount = chunkRowCount;
|
||||
stripe.chunkGroupRowCount = chunkGroupRowCount;
|
||||
stripe.columnCount = columnCount;
|
||||
stripe.rowCount = rowCount;
|
||||
stripe.id = highestId + 1;
|
||||
|
@ -869,7 +934,7 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
|||
datumArray[Anum_columnar_stripe_column_count - 1]);
|
||||
stripeMetadata->chunkCount = DatumGetInt32(
|
||||
datumArray[Anum_columnar_stripe_chunk_count - 1]);
|
||||
stripeMetadata->chunkRowCount = DatumGetInt32(
|
||||
stripeMetadata->chunkGroupRowCount = DatumGetInt32(
|
||||
datumArray[Anum_columnar_stripe_chunk_row_count - 1]);
|
||||
stripeMetadata->rowCount = DatumGetInt64(
|
||||
datumArray[Anum_columnar_stripe_row_count - 1]);
|
||||
|
@ -1214,6 +1279,17 @@ ColumnarChunkIndexRelationId(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnarChunkGroupIndexRelationId returns relation id of columnar.chunk_group_pkey.
|
||||
* TODO: should we cache this similar to citus?
|
||||
*/
|
||||
static Oid
|
||||
ColumnarChunkGroupIndexRelationId(void)
|
||||
{
|
||||
return get_relname_relid("chunk_group_pkey", ColumnarNamespaceId());
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnarNamespaceId returns namespace id of the schema we store columnar
|
||||
* related tables.
|
||||
|
|
|
@ -73,13 +73,6 @@ struct ColumnarReadState
|
|||
int64 currentStripe; /* index of current stripe */
|
||||
StripeReadState *stripeReadState;
|
||||
|
||||
/*
|
||||
* Following are used for tables with zero columns, or when no
|
||||
* columns are projected.
|
||||
*/
|
||||
uint64 totalRowCount;
|
||||
uint64 readRowCount;
|
||||
|
||||
/*
|
||||
* List of Var pointers for columns in the query. We use this both for
|
||||
* getting vector of projected columns, and also when we want to build
|
||||
|
@ -277,39 +270,15 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
|||
stripeReadState->projectedColumnList = projectedColumnList;
|
||||
stripeReadState->stripeReadContext = stripeReadContext;
|
||||
|
||||
/*
|
||||
* If there are no attributes in the table at all, reading the chunk
|
||||
* groups will fail (because there are no chunks), so we must introduce a
|
||||
* special case. Also follow this special case if no attributes are
|
||||
* projected, so that we won't have to deal with deleted attributes,
|
||||
* either.
|
||||
*
|
||||
* TODO: refactor metadata so that chunk groups hold the row count; rather
|
||||
* than individual chunks (which is repetitive in the normal case, and
|
||||
* problematic in the case where there are zero columns).
|
||||
*/
|
||||
if (list_length(projectedColumnList) != 0)
|
||||
{
|
||||
stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
|
||||
stripeMetadata,
|
||||
tupleDesc,
|
||||
projectedColumnList,
|
||||
whereClauseList,
|
||||
&stripeReadState->
|
||||
chunkGroupsFiltered);
|
||||
stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
|
||||
stripeMetadata,
|
||||
tupleDesc,
|
||||
projectedColumnList,
|
||||
whereClauseList,
|
||||
&stripeReadState->
|
||||
chunkGroupsFiltered);
|
||||
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
stripeReadState->stripeBuffers = NULL;
|
||||
|
||||
/*
|
||||
* If there are no projected columns, then no chunks will be filtered,
|
||||
* so the row count is simply the stripe's row count.
|
||||
*/
|
||||
stripeReadState->rowCount = stripeMetadata->rowCount;
|
||||
}
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
@ -346,23 +315,6 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are no attributes in the table at all, stripeBuffers won't be
|
||||
* loaded so we just return rowCount empty tuples.
|
||||
*/
|
||||
if (stripeReadState->stripeBuffers == NULL)
|
||||
{
|
||||
if (stripeReadState->currentRow < stripeReadState->rowCount)
|
||||
{
|
||||
stripeReadState->currentRow++;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (stripeReadState->chunkGroupReadState == NULL)
|
||||
|
@ -405,19 +357,21 @@ static ChunkGroupReadState *
|
|||
BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, MemoryContext cxt)
|
||||
{
|
||||
uint32 chunkRowCount = stripeBuffers->selectedChunkRowCount[chunkIndex];
|
||||
uint32 chunkGroupRowCount =
|
||||
stripeBuffers->selectedChunkGroupRowCounts[chunkIndex];
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(cxt);
|
||||
|
||||
ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState));
|
||||
|
||||
chunkGroupReadState->currentRow = 0;
|
||||
chunkGroupReadState->rowCount = chunkRowCount;
|
||||
chunkGroupReadState->rowCount = chunkGroupRowCount;
|
||||
chunkGroupReadState->columnCount = tupleDesc->natts;
|
||||
chunkGroupReadState->projectedColumnList = projectedColumnList;
|
||||
|
||||
chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex,
|
||||
chunkRowCount, tupleDesc,
|
||||
chunkGroupRowCount,
|
||||
tupleDesc,
|
||||
projectedColumnList);
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
@ -495,7 +449,7 @@ ColumnarReadChunkGroupsFiltered(ColumnarReadState *state)
|
|||
* value arrays for requested columns in columnMask.
|
||||
*/
|
||||
ChunkData *
|
||||
CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkRowCount)
|
||||
CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowCount)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
|
||||
|
@ -504,15 +458,17 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkRowCount)
|
|||
chunkData->valueArray = palloc0(columnCount * sizeof(Datum *));
|
||||
chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo));
|
||||
chunkData->columnCount = columnCount;
|
||||
chunkData->rowCount = chunkRowCount;
|
||||
chunkData->rowCount = chunkGroupRowCount;
|
||||
|
||||
/* allocate chunk memory for deserialized data */
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
if (columnMask[columnIndex])
|
||||
{
|
||||
chunkData->existsArray[columnIndex] = palloc0(chunkRowCount * sizeof(bool));
|
||||
chunkData->valueArray[columnIndex] = palloc0(chunkRowCount * sizeof(Datum));
|
||||
chunkData->existsArray[columnIndex] = palloc0(chunkGroupRowCount *
|
||||
sizeof(bool));
|
||||
chunkData->valueArray[columnIndex] = palloc0(chunkGroupRowCount *
|
||||
sizeof(Datum));
|
||||
chunkData->valueBufferArray[columnIndex] = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -601,14 +557,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
|
||||
selectedChunkMask);
|
||||
|
||||
uint32 selectedChunkCount = selectedChunkSkipList->chunkCount;
|
||||
uint32 *selectedChunkRowCount = palloc0(selectedChunkCount * sizeof(uint32));
|
||||
for (int chunkIndex = 0; chunkIndex < selectedChunkCount; chunkIndex++)
|
||||
{
|
||||
selectedChunkRowCount[chunkIndex] =
|
||||
selectedChunkSkipList->chunkSkipNodeArray[0][chunkIndex].rowCount;
|
||||
}
|
||||
|
||||
/* load column data for projected columns */
|
||||
ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||
|
||||
|
@ -634,8 +582,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
stripeBuffers->columnCount = columnCount;
|
||||
stripeBuffers->rowCount = StripeSkipListRowCount(selectedChunkSkipList);
|
||||
stripeBuffers->columnBuffersArray = columnBuffersArray;
|
||||
stripeBuffers->selectedChunks = selectedChunkCount;
|
||||
stripeBuffers->selectedChunkRowCount = selectedChunkRowCount;
|
||||
stripeBuffers->selectedChunkGroupRowCounts =
|
||||
selectedChunkSkipList->chunkGroupRowCounts;
|
||||
|
||||
return stripeBuffers;
|
||||
}
|
||||
|
@ -939,6 +887,7 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
uint32 chunkIndex = 0;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = stripeSkipList->columnCount;
|
||||
uint32 selectedChunkIndex = 0;
|
||||
|
||||
for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
|
||||
{
|
||||
|
@ -948,13 +897,13 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
}
|
||||
}
|
||||
|
||||
ColumnChunkSkipNode **selectedChunkSkipNodeArray = palloc0(columnCount *
|
||||
sizeof(ColumnChunkSkipNode
|
||||
*));
|
||||
ColumnChunkSkipNode **selectedChunkSkipNodeArray =
|
||||
palloc0(columnCount * sizeof(ColumnChunkSkipNode *));
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
uint32 selectedChunkIndex = 0;
|
||||
bool firstColumn = columnIndex == 0;
|
||||
selectedChunkIndex = 0;
|
||||
|
||||
/* first column's chunk skip node is always read */
|
||||
if (!projectedColumnMask[columnIndex] && !firstColumn)
|
||||
|
@ -979,12 +928,24 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
}
|
||||
}
|
||||
|
||||
StripeSkipList *SelectedChunkSkipList = palloc0(sizeof(StripeSkipList));
|
||||
SelectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray;
|
||||
SelectedChunkSkipList->chunkCount = selectedChunkCount;
|
||||
SelectedChunkSkipList->columnCount = stripeSkipList->columnCount;
|
||||
selectedChunkIndex = 0;
|
||||
uint32 *chunkGroupRowCounts = palloc0(selectedChunkCount * sizeof(uint32));
|
||||
for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
|
||||
{
|
||||
if (selectedChunkMask[chunkIndex])
|
||||
{
|
||||
chunkGroupRowCounts[selectedChunkIndex++] =
|
||||
stripeSkipList->chunkGroupRowCounts[chunkIndex];
|
||||
}
|
||||
}
|
||||
|
||||
return SelectedChunkSkipList;
|
||||
StripeSkipList *selectedChunkSkipList = palloc0(sizeof(StripeSkipList));
|
||||
selectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray;
|
||||
selectedChunkSkipList->chunkCount = selectedChunkCount;
|
||||
selectedChunkSkipList->columnCount = stripeSkipList->columnCount;
|
||||
selectedChunkSkipList->chunkGroupRowCounts = chunkGroupRowCounts;
|
||||
|
||||
return selectedChunkSkipList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -998,13 +959,12 @@ StripeSkipListRowCount(StripeSkipList *stripeSkipList)
|
|||
{
|
||||
uint32 stripeSkipListRowCount = 0;
|
||||
uint32 chunkIndex = 0;
|
||||
ColumnChunkSkipNode *firstColumnSkipNodeArray =
|
||||
stripeSkipList->chunkSkipNodeArray[0];
|
||||
uint32 *chunkGroupRowCounts = stripeSkipList->chunkGroupRowCounts;
|
||||
|
||||
for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
|
||||
{
|
||||
uint32 chunkRowCount = firstColumnSkipNodeArray[chunkIndex].rowCount;
|
||||
stripeSkipListRowCount += chunkRowCount;
|
||||
uint32 chunkGroupRowCount = chunkGroupRowCounts[chunkIndex];
|
||||
stripeSkipListRowCount += chunkGroupRowCount;
|
||||
}
|
||||
|
||||
return stripeSkipListRowCount;
|
||||
|
|
|
@ -96,7 +96,7 @@ typedef struct StripeMetadata
|
|||
uint64 dataLength;
|
||||
uint32 columnCount;
|
||||
uint32 chunkCount;
|
||||
uint32 chunkRowCount;
|
||||
uint32 chunkGroupRowCount;
|
||||
uint64 rowCount;
|
||||
uint64 id;
|
||||
} StripeMetadata;
|
||||
|
@ -140,6 +140,7 @@ typedef struct ColumnChunkSkipNode
|
|||
typedef struct StripeSkipList
|
||||
{
|
||||
ColumnChunkSkipNode **chunkSkipNodeArray;
|
||||
uint32 *chunkGroupRowCounts;
|
||||
uint32 columnCount;
|
||||
uint32 chunkCount;
|
||||
} StripeSkipList;
|
||||
|
@ -202,13 +203,7 @@ typedef struct StripeBuffers
|
|||
uint32 rowCount;
|
||||
ColumnBuffers **columnBuffersArray;
|
||||
|
||||
/*
|
||||
* We might skip reading some chunks because they're refuted by the
|
||||
* WHERE clause. We keep number of selected chunks and number of rows
|
||||
* in each of them.
|
||||
*/
|
||||
uint32 selectedChunks;
|
||||
uint32 *selectedChunkRowCount;
|
||||
uint32 *selectedChunkGroupRowCounts;
|
||||
} StripeBuffers;
|
||||
|
||||
|
||||
|
@ -256,7 +251,7 @@ extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
|||
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
||||
int16 procedureId);
|
||||
extern ChunkData * CreateEmptyChunkData(uint32 columnCount, bool *columnMask,
|
||||
uint32 chunkRowCount);
|
||||
uint32 chunkGroupRowCount);
|
||||
extern void FreeChunkData(ChunkData *chunkData);
|
||||
extern uint64 ColumnarTableRowCount(Relation relation);
|
||||
extern bool CompressBuffer(StringInfo inputBuffer,
|
||||
|
@ -283,7 +278,7 @@ extern List * StripesForRelfilenode(RelFileNode relfilenode);
|
|||
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
|
||||
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
||||
uint64 rowCount, uint64 columnCount,
|
||||
uint64 chunkCount, uint64 chunkRowCount);
|
||||
uint64 chunkCount, uint64 chunkGroupRowCount);
|
||||
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
|
|
Loading…
Reference in New Issue