mirror of https://github.com/citusdata/citus.git
wip
parent
67bc990c1c
commit
ac40014d04
|
@ -83,8 +83,6 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
|
|||
static void LockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||
static void UnlockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||
chunkGroupCount);
|
||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||
static Oid ColumnarStripeRelationId(void);
|
||||
static Oid ColumnarStripeIndexRelationId(void);
|
||||
|
@ -151,11 +149,10 @@ typedef FormData_columnar_options *Form_columnar_options;
|
|||
#define Anum_columnar_stripe_chunk_count 8
|
||||
|
||||
/* constants for columnar.chunk_group */
|
||||
#define Natts_columnar_chunkgroup 4
|
||||
#define Natts_columnar_chunkgroup 3
|
||||
#define Anum_columnar_chunkgroup_storageid 1
|
||||
#define Anum_columnar_chunkgroup_stripe 2
|
||||
#define Anum_columnar_chunkgroup_chunk 3
|
||||
#define Anum_columnar_chunkgroup_row_count 4
|
||||
|
||||
/* constants for columnar.chunk */
|
||||
#define Natts_columnar_chunk 14
|
||||
|
@ -485,24 +482,19 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
|
|||
*/
|
||||
void
|
||||
SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||
List *chunkGroupRowCounts)
|
||||
uint32 stripeChunkCount)
|
||||
{
|
||||
ColumnarMetapage *metapage = ReadMetapage(relfilenode, false);
|
||||
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
||||
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock);
|
||||
ModifyState *modifyState = StartModifyRelation(columnarChunkGroup);
|
||||
|
||||
ListCell *lc = NULL;
|
||||
int chunkId = 0;
|
||||
|
||||
foreach(lc, chunkGroupRowCounts)
|
||||
for (uint32 chunkId = 0; chunkId < stripeChunkCount; chunkId++)
|
||||
{
|
||||
int64 rowCount = lfirst_int(lc);
|
||||
Datum values[Natts_columnar_chunkgroup] = {
|
||||
UInt64GetDatum(metapage->storageId),
|
||||
Int64GetDatum(stripe),
|
||||
Int32GetDatum(chunkId),
|
||||
Int64GetDatum(rowCount)
|
||||
Int32GetDatum(chunkId)
|
||||
};
|
||||
|
||||
bool nulls[Natts_columnar_chunkgroup] = { false };
|
||||
|
@ -518,6 +510,8 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
|||
}
|
||||
|
||||
|
||||
static StripeMetadata * GetStripeById(uint64 storageId, uint64 stripeId, Snapshot snapshot);
|
||||
|
||||
/*
|
||||
* ReadStripeSkipList fetches chunk metadata for a given stripe.
|
||||
*/
|
||||
|
@ -623,72 +617,12 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
|||
index_close(index, AccessShareLock);
|
||||
table_close(columnarChunk, AccessShareLock);
|
||||
|
||||
chunkList->chunkGroupRowCounts =
|
||||
ReadChunkGroupRowCounts(metapage->storageId, stripe, chunkCount);
|
||||
chunkList->chunkGroupRowCount = GetStripeById(metapage->storageId, stripe, NULL)->chunkGroupRowCount;
|
||||
|
||||
return chunkList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
|
||||
* given stripe.
|
||||
*/
|
||||
static uint32 *
|
||||
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
||||
{
|
||||
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
||||
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
|
||||
Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock);
|
||||
|
||||
ScanKeyData scanKey[2];
|
||||
ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
|
||||
ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey);
|
||||
|
||||
uint32 chunkGroupIndex = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));
|
||||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
Datum datumArray[Natts_columnar_chunkgroup];
|
||||
bool isNullArray[Natts_columnar_chunkgroup];
|
||||
|
||||
heap_deform_tuple(heapTuple,
|
||||
RelationGetDescr(columnarChunkGroup),
|
||||
datumArray, isNullArray);
|
||||
|
||||
uint32 tupleChunkGroupIndex =
|
||||
DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]);
|
||||
if (chunkGroupIndex >= chunkGroupCount ||
|
||||
tupleChunkGroupIndex != chunkGroupIndex)
|
||||
{
|
||||
elog(ERROR, "unexpected chunk group");
|
||||
}
|
||||
|
||||
chunkGroupRowCounts[chunkGroupIndex] =
|
||||
(uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]);
|
||||
chunkGroupIndex++;
|
||||
}
|
||||
|
||||
if (chunkGroupIndex != chunkGroupCount)
|
||||
{
|
||||
elog(ERROR, "unexpected chunk group count");
|
||||
}
|
||||
|
||||
systable_endscan_ordered(scanDescriptor);
|
||||
index_close(index, AccessShareLock);
|
||||
table_close(columnarChunkGroup, AccessShareLock);
|
||||
|
||||
return chunkGroupRowCounts;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertStripeMetadataRow adds a row to columnar.stripe.
|
||||
*/
|
||||
|
@ -896,6 +830,24 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
|||
return stripe;
|
||||
}
|
||||
|
||||
#include "distributed/listutils.h"
|
||||
|
||||
static StripeMetadata *
|
||||
GetStripeById(uint64 storageId, uint64 stripeId, Snapshot snapshot)
|
||||
{
|
||||
List *stripeList = ReadDataFileStripeList(storageId, snapshot);
|
||||
StripeMetadata *stripeMetadata = NULL;
|
||||
foreach_ptr(stripeMetadata, stripeList)
|
||||
{
|
||||
if (stripeMetadata->id == stripeId)
|
||||
{
|
||||
return stripeMetadata;
|
||||
}
|
||||
}
|
||||
|
||||
ereport(ERROR, (errmsg("Not expected")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* ReadDataFileStripeList reads the stripe list for a given storageId
|
||||
|
|
|
@ -352,8 +352,7 @@ static ChunkGroupReadState *
|
|||
BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, MemoryContext cxt)
|
||||
{
|
||||
uint32 chunkGroupRowCount =
|
||||
stripeBuffers->selectedChunkGroupRowCounts[chunkIndex];
|
||||
uint32 chunkGroupRowCount = stripeBuffers->stripeChunkGroupRowCount;
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(cxt);
|
||||
|
||||
|
@ -577,8 +576,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
stripeBuffers->columnCount = columnCount;
|
||||
stripeBuffers->rowCount = StripeSkipListRowCount(selectedChunkSkipList);
|
||||
stripeBuffers->columnBuffersArray = columnBuffersArray;
|
||||
stripeBuffers->selectedChunkGroupRowCounts =
|
||||
selectedChunkSkipList->chunkGroupRowCounts;
|
||||
stripeBuffers->stripeChunkGroupRowCount =
|
||||
selectedChunkSkipList->chunkGroupRowCount;
|
||||
|
||||
return stripeBuffers;
|
||||
}
|
||||
|
@ -899,22 +898,11 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
}
|
||||
}
|
||||
|
||||
selectedChunkIndex = 0;
|
||||
uint32 *chunkGroupRowCounts = palloc0(selectedChunkCount * sizeof(uint32));
|
||||
for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
|
||||
{
|
||||
if (selectedChunkMask[chunkIndex])
|
||||
{
|
||||
chunkGroupRowCounts[selectedChunkIndex++] =
|
||||
stripeSkipList->chunkGroupRowCounts[chunkIndex];
|
||||
}
|
||||
}
|
||||
|
||||
StripeSkipList *selectedChunkSkipList = palloc0(sizeof(StripeSkipList));
|
||||
selectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray;
|
||||
selectedChunkSkipList->chunkCount = selectedChunkCount;
|
||||
selectedChunkSkipList->columnCount = stripeSkipList->columnCount;
|
||||
selectedChunkSkipList->chunkGroupRowCounts = chunkGroupRowCounts;
|
||||
selectedChunkSkipList->chunkGroupRowCount = stripeSkipList->chunkGroupRowCount;
|
||||
|
||||
return selectedChunkSkipList;
|
||||
}
|
||||
|
@ -930,11 +918,11 @@ StripeSkipListRowCount(StripeSkipList *stripeSkipList)
|
|||
{
|
||||
uint32 stripeSkipListRowCount = 0;
|
||||
uint32 chunkIndex = 0;
|
||||
uint32 *chunkGroupRowCounts = stripeSkipList->chunkGroupRowCounts;
|
||||
uint32 chunkGroupRowCount = stripeSkipList->chunkGroupRowCount;
|
||||
|
||||
/* TODO: multiplication */
|
||||
for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
|
||||
{
|
||||
uint32 chunkGroupRowCount = chunkGroupRowCounts[chunkIndex];
|
||||
stripeSkipListRowCount += chunkGroupRowCount;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ struct ColumnarWriteState
|
|||
ColumnarOptions options;
|
||||
ChunkData *chunkData;
|
||||
|
||||
List *chunkGroupRowCounts;
|
||||
uint32 chunkGroupRowCount;
|
||||
|
||||
/*
|
||||
* compressionBuffer buffer is used as temporary storage during
|
||||
|
@ -546,12 +546,12 @@ FlushStripe(ColumnarWriteState *writeState)
|
|||
|
||||
SaveChunkGroups(writeState->relfilenode,
|
||||
stripeMetadata.id,
|
||||
writeState->chunkGroupRowCounts);
|
||||
stripeSkipList->chunkCount);
|
||||
SaveStripeSkipList(writeState->relfilenode,
|
||||
stripeMetadata.id,
|
||||
stripeSkipList, tupleDescriptor);
|
||||
|
||||
writeState->chunkGroupRowCounts = NIL;
|
||||
writeState->chunkGroupRowCount = 0;
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
}
|
||||
|
@ -640,8 +640,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row
|
|||
const uint32 columnCount = stripeBuffers->columnCount;
|
||||
StringInfo compressionBuffer = writeState->compressionBuffer;
|
||||
|
||||
writeState->chunkGroupRowCounts =
|
||||
lappend_int(writeState->chunkGroupRowCounts, rowCount);
|
||||
writeState->chunkGroupRowCount = rowCount;
|
||||
|
||||
/* serialize exist values, data values are already serialized */
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
|
|
|
@ -16,3 +16,5 @@ ALTER TABLE columnar.chunk_group DROP CONSTRAINT chunk_group_storage_id_fkey;
|
|||
$$;
|
||||
END IF;
|
||||
END$proc$;
|
||||
|
||||
ALTER TABLE columnar.chunk_group DROP COLUMN row_count;
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
/* columnar--10.1-1--10.0-3.sql */
|
||||
|
||||
-- TODO: populate row_count again and make it NOT NULL, or ... ?
|
||||
ALTER TABLE columnar.chunk_group ADD COLUMN row_count bigint;
|
||||
|
||||
-- define foreign keys between columnar metadata tables
|
||||
ALTER TABLE columnar.chunk
|
||||
ADD FOREIGN KEY (storage_id, stripe_num, chunk_group_num)
|
||||
|
|
|
@ -112,7 +112,7 @@ typedef struct ColumnChunkSkipNode
|
|||
typedef struct StripeSkipList
|
||||
{
|
||||
ColumnChunkSkipNode **chunkSkipNodeArray;
|
||||
uint32 *chunkGroupRowCounts;
|
||||
uint32 chunkGroupRowCount;
|
||||
uint32 columnCount;
|
||||
uint32 chunkCount;
|
||||
} StripeSkipList;
|
||||
|
@ -175,7 +175,7 @@ typedef struct StripeBuffers
|
|||
uint32 rowCount;
|
||||
ColumnBuffers **columnBuffersArray;
|
||||
|
||||
uint32 *selectedChunkGroupRowCounts;
|
||||
uint32 stripeChunkGroupRowCount;
|
||||
} StripeBuffers;
|
||||
|
||||
|
||||
|
@ -248,7 +248,7 @@ extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
|||
StripeSkipList *stripeSkipList,
|
||||
TupleDesc tupleDescriptor);
|
||||
extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||
List *chunkGroupRowCounts);
|
||||
uint32 stripeChunkCount);
|
||||
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||
TupleDesc tupleDescriptor,
|
||||
uint32 chunkCount);
|
||||
|
|
Loading…
Reference in New Issue