mirror of https://github.com/citusdata/citus.git
refactor projected columns to Bitmapset
parent
32421449c1
commit
a3842d095c
|
@ -383,13 +383,6 @@ ColumnarAttrNeeded(ScanState *ss)
|
|||
{
|
||||
Var *var = lfirst(lc);
|
||||
|
||||
if (var->varattno < 0)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"UPDATE and CTID scans not supported for ColumnarScan")));
|
||||
}
|
||||
|
||||
if (var->varattno == 0)
|
||||
{
|
||||
elog(DEBUG1, "Need attribute: all");
|
||||
|
|
|
@ -43,7 +43,7 @@ typedef struct ChunkGroupReadState
|
|||
int64 currentRow;
|
||||
int64 rowCount;
|
||||
int columnCount;
|
||||
List *projectedColumnList; /* borrowed reference */
|
||||
Bitmapset *projectedColumnList; /* borrowed reference */
|
||||
ChunkData *chunkGroupData;
|
||||
} ChunkGroupReadState;
|
||||
|
||||
|
@ -58,7 +58,7 @@ typedef struct StripeReadState
|
|||
int64 chunkGroupsFiltered;
|
||||
MemoryContext stripeReadContext;
|
||||
StripeBuffers *stripeBuffers; /* allocated in stripeReadContext */
|
||||
List *projectedColumnList; /* borrowed reference */
|
||||
Bitmapset *projectedColumnList; /* borrowed reference */
|
||||
ChunkGroupReadState *chunkGroupReadState; /* owned */
|
||||
} StripeReadState;
|
||||
|
||||
|
@ -75,7 +75,7 @@ struct ColumnarReadState
|
|||
* Integer list of attribute numbers (1-indexed) for columns needed by the
|
||||
* query.
|
||||
*/
|
||||
List *projectedColumnList;
|
||||
Bitmapset *projectedColumnList;
|
||||
|
||||
List *whereClauseList;
|
||||
List *whereClauseVars;
|
||||
|
@ -97,7 +97,8 @@ static void ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadStat
|
|||
static bool StripeReadInProgress(ColumnarReadState *readState);
|
||||
static bool HasUnreadStripe(ColumnarReadState *readState);
|
||||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||
TupleDesc tupleDesc, List *projectedColumnList,
|
||||
TupleDesc tupleDesc,
|
||||
Bitmapset *projectedColumnList,
|
||||
List *whereClauseList, List *whereClauseVars,
|
||||
MemoryContext stripeReadContext);
|
||||
static void EndStripeRead(StripeReadState *stripeReadState);
|
||||
|
@ -107,7 +108,7 @@ static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnVal
|
|||
static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
|
||||
chunkIndex,
|
||||
TupleDesc tupleDesc,
|
||||
List *projectedColumnList,
|
||||
Bitmapset *projectedColumnList,
|
||||
MemoryContext cxt);
|
||||
static void EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState);
|
||||
static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState,
|
||||
|
@ -116,7 +117,7 @@ static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState,
|
|||
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||
StripeMetadata *stripeMetadata,
|
||||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList,
|
||||
Bitmapset *projectedColumnList,
|
||||
List *whereClauseList,
|
||||
List *whereClauseVars,
|
||||
int64 *chunkGroupsFiltered);
|
||||
|
@ -133,10 +134,9 @@ static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
|||
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
|
||||
static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
|
||||
static StripeSkipList * SelectedChunkSkipList(StripeSkipList *stripeSkipList,
|
||||
bool *projectedColumnMask,
|
||||
Bitmapset *projectedColumnMask,
|
||||
bool *selectedChunkMask);
|
||||
static uint32 StripeSkipListRowCount(StripeSkipList *stripeSkipList);
|
||||
static bool * ProjectedColumnMask(uint32 columnCount, List *projectedColumnList);
|
||||
static void DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
|
||||
uint32 boolArrayLength);
|
||||
static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
|
||||
|
@ -145,7 +145,7 @@ static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
|
|||
Datum *datumArray);
|
||||
static ChunkData * DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
|
||||
uint32 rowCount, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList);
|
||||
Bitmapset *projectedColumnList);
|
||||
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
||||
Form_pg_attribute attributeForm);
|
||||
|
||||
|
@ -157,7 +157,7 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
|||
*/
|
||||
ColumnarReadState *
|
||||
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
Bitmapset *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
List *stripeList = StripesForRelfilenode(relation->rd_node);
|
||||
StripeMetadata *stripeMetadata = NULL;
|
||||
|
@ -259,7 +259,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
|||
*/
|
||||
bool
|
||||
ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber,
|
||||
List *neededColumnList, Datum *columnValues,
|
||||
Bitmapset *neededColumnList, Datum *columnValues,
|
||||
bool *columnNulls, Snapshot snapshot)
|
||||
{
|
||||
StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation, rowNumber, snapshot);
|
||||
|
@ -402,7 +402,8 @@ ColumnarEndRead(ColumnarReadState *readState)
|
|||
*/
|
||||
static StripeReadState *
|
||||
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
|
||||
Bitmapset *projectedColumnList, List *whereClauseList,
|
||||
List *whereClauseVars,
|
||||
MemoryContext stripeReadContext)
|
||||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||
|
@ -519,7 +520,7 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
|||
*/
|
||||
static ChunkGroupReadState *
|
||||
BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, MemoryContext cxt)
|
||||
Bitmapset *projectedColumnList, MemoryContext cxt)
|
||||
{
|
||||
uint32 chunkGroupRowCount =
|
||||
stripeBuffers->selectedChunkGroupRowCounts[chunkIndex];
|
||||
|
@ -577,15 +578,23 @@ ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, Datum *columnVal
|
|||
*/
|
||||
memset(columnNulls, true, sizeof(bool) * chunkGroupReadState->columnCount);
|
||||
|
||||
int attno;
|
||||
foreach_int(attno, chunkGroupReadState->projectedColumnList)
|
||||
int i = -1;
|
||||
while ((i = bms_next_member(chunkGroupReadState->projectedColumnList, i)) >= 0)
|
||||
{
|
||||
int attno = i + FirstLowInvalidHeapAttributeNumber;
|
||||
|
||||
const ChunkData *chunkGroupData = chunkGroupReadState->chunkGroupData;
|
||||
const int rowIndex = chunkGroupReadState->currentRow;
|
||||
|
||||
/* attno is 1-indexed; existsArray is 0-indexed */
|
||||
const uint32 columnIndex = attno - 1;
|
||||
|
||||
/* skip system columns */
|
||||
if (attno < 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (chunkGroupData->existsArray[columnIndex][rowIndex])
|
||||
{
|
||||
columnValues[columnIndex] = chunkGroupData->valueArray[columnIndex][rowIndex];
|
||||
|
@ -615,9 +624,9 @@ ColumnarReadChunkGroupsFiltered(ColumnarReadState *state)
|
|||
* value arrays for requested columns in columnMask.
|
||||
*/
|
||||
ChunkData *
|
||||
CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowCount)
|
||||
CreateEmptyChunkData(uint32 columnCount, Bitmapset *columnMask, uint32 chunkGroupRowCount)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
int columnIndex = 0;
|
||||
|
||||
ChunkData *chunkData = palloc0(sizeof(ChunkData));
|
||||
chunkData->existsArray = palloc0(columnCount * sizeof(bool *));
|
||||
|
@ -629,7 +638,8 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowC
|
|||
/* allocate chunk memory for deserialized data */
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
if (columnMask[columnIndex])
|
||||
if (bms_is_member(columnIndex + 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
columnMask))
|
||||
{
|
||||
chunkData->existsArray[columnIndex] = palloc0(chunkGroupRowCount *
|
||||
sizeof(bool));
|
||||
|
@ -703,15 +713,13 @@ ColumnarTableRowCount(Relation relation)
|
|||
*/
|
||||
static StripeBuffers *
|
||||
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||
TupleDesc tupleDescriptor, List *projectedColumnList,
|
||||
TupleDesc tupleDescriptor, Bitmapset *projectedColumnList,
|
||||
List *whereClauseList, List *whereClauseVars,
|
||||
int64 *chunkGroupsFiltered)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
|
||||
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
|
||||
|
||||
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
|
||||
stripeMetadata->id,
|
||||
tupleDescriptor,
|
||||
|
@ -721,15 +729,15 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
whereClauseVars, chunkGroupsFiltered);
|
||||
|
||||
StripeSkipList *selectedChunkSkipList =
|
||||
SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
|
||||
selectedChunkMask);
|
||||
SelectedChunkSkipList(stripeSkipList, projectedColumnList, selectedChunkMask);
|
||||
|
||||
/* load column data for projected columns */
|
||||
ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||
|
||||
for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++)
|
||||
{
|
||||
if (projectedColumnMask[columnIndex])
|
||||
if (bms_is_member(columnIndex + 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
projectedColumnList))
|
||||
{
|
||||
ColumnChunkSkipNode *chunkSkipNode =
|
||||
selectedChunkSkipList->chunkSkipNodeArray[columnIndex];
|
||||
|
@ -1083,7 +1091,7 @@ UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue)
|
|||
* non-selected chunks are removed from the given stripeSkipList.
|
||||
*/
|
||||
static StripeSkipList *
|
||||
SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
||||
SelectedChunkSkipList(StripeSkipList *stripeSkipList, Bitmapset *projectedColumnMask,
|
||||
bool *selectedChunkMask)
|
||||
{
|
||||
uint32 selectedChunkCount = 0;
|
||||
|
@ -1109,7 +1117,8 @@ SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
selectedChunkIndex = 0;
|
||||
|
||||
/* first column's chunk skip node is always read */
|
||||
if (!projectedColumnMask[columnIndex] && !firstColumn)
|
||||
if (!bms_is_member(columnIndex + 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
projectedColumnMask) && !firstColumn)
|
||||
{
|
||||
selectedChunkSkipNodeArray[columnIndex] = NULL;
|
||||
continue;
|
||||
|
@ -1174,27 +1183,6 @@ StripeSkipListRowCount(StripeSkipList *stripeSkipList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProjectedColumnMask returns a boolean array in which the projected columns
|
||||
* from the projected column list are marked as true.
|
||||
*/
|
||||
static bool *
|
||||
ProjectedColumnMask(uint32 columnCount, List *projectedColumnList)
|
||||
{
|
||||
bool *projectedColumnMask = palloc0(columnCount * sizeof(bool));
|
||||
int attno;
|
||||
|
||||
foreach_int(attno, projectedColumnList)
|
||||
{
|
||||
/* attno is 1-indexed; projectedColumnMask is 0-indexed */
|
||||
int columnIndex = attno - 1;
|
||||
projectedColumnMask[columnIndex] = true;
|
||||
}
|
||||
|
||||
return projectedColumnMask;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeserializeBoolArray reads an array of bits from the given buffer and stores
|
||||
* it in provided bool array.
|
||||
|
@ -1279,11 +1267,11 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou
|
|||
static ChunkData *
|
||||
DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
|
||||
uint32 rowCount, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList)
|
||||
Bitmapset *projectedColumnList)
|
||||
{
|
||||
int columnIndex = 0;
|
||||
bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList);
|
||||
ChunkData *chunkData = CreateEmptyChunkData(tupleDescriptor->natts, columnMask,
|
||||
ChunkData *chunkData = CreateEmptyChunkData(tupleDescriptor->natts,
|
||||
projectedColumnList,
|
||||
rowCount);
|
||||
|
||||
for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++)
|
||||
|
@ -1292,7 +1280,9 @@ DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
|
|||
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
||||
bool columnAdded = false;
|
||||
|
||||
if (columnBuffers == NULL && columnMask[columnIndex])
|
||||
if (columnBuffers == NULL &&
|
||||
bms_is_member(columnIndex + 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
projectedColumnList))
|
||||
{
|
||||
columnAdded = true;
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ static void ColumnarProcessUtility(PlannedStmt *pstmt,
|
|||
QueryCompletionCompat *completionTag);
|
||||
static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode,
|
||||
int timeout, int retryInterval);
|
||||
static List * NeededColumnsList(TupleDesc tupdesc, Bitmapset *attr_needed);
|
||||
static void ErrorOnUnsupportedColumns(TupleDesc tupdesc, Bitmapset *attr_needed);
|
||||
static void LogRelationStats(Relation rel, int elevel);
|
||||
static void TruncateColumnar(Relation rel, int elevel);
|
||||
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
||||
|
@ -225,8 +225,8 @@ static ColumnarReadState *
|
|||
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
||||
List *scanQual)
|
||||
{
|
||||
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
|
||||
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
|
||||
ErrorOnUnsupportedColumns(tupdesc, attr_needed);
|
||||
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, attr_needed,
|
||||
scanQual);
|
||||
|
||||
return readState;
|
||||
|
@ -442,9 +442,9 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *scan,
|
|||
|
||||
|
||||
TupleDesc relationTupleDesc = RelationGetDescr(scan->rel);
|
||||
List *relationColumnList = NeededColumnsList(relationTupleDesc, attr_needed);
|
||||
ErrorOnUnsupportedColumns(relationTupleDesc, attr_needed);
|
||||
uint64 rowNumber = tid_to_row_number(*tid);
|
||||
if (!ColumnarReadRowByRowNumber(scan->rel, rowNumber, relationColumnList,
|
||||
if (!ColumnarReadRowByRowNumber(scan->rel, rowNumber, attr_needed,
|
||||
slot->tts_values, slot->tts_isnull, snapshot))
|
||||
{
|
||||
return false;
|
||||
|
@ -734,9 +734,9 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
int natts = OldHeap->rd_att->natts;
|
||||
Bitmapset *attr_needed = bms_add_range(NULL, 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
natts - FirstLowInvalidHeapAttributeNumber);
|
||||
List *projectedColumnList = NeededColumnsList(sourceDesc, attr_needed);
|
||||
ErrorOnUnsupportedColumns(sourceDesc, attr_needed);
|
||||
ColumnarReadState *readState = ColumnarBeginRead(OldHeap, sourceDesc,
|
||||
projectedColumnList,
|
||||
attr_needed,
|
||||
NULL);
|
||||
|
||||
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||
|
@ -762,27 +762,26 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
* NeededColumnsList returns a list of AttrNumber's for the columns that
|
||||
* are not dropped and specified by attr_needed.
|
||||
*/
|
||||
static List *
|
||||
NeededColumnsList(TupleDesc tupdesc, Bitmapset *attr_needed)
|
||||
static void
|
||||
ErrorOnUnsupportedColumns(TupleDesc tupdesc, Bitmapset *attr_needed)
|
||||
{
|
||||
List *columnList = NIL;
|
||||
|
||||
for (int i = 0; i < tupdesc->natts; i++)
|
||||
int i = -1;
|
||||
while ((i = bms_next_member(attr_needed, i)) >= 0)
|
||||
{
|
||||
if (tupdesc->attrs[i].attisdropped)
|
||||
AttrNumber varattno = i + FirstLowInvalidHeapAttributeNumber;
|
||||
if (varattno < 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (varattno != SelfItemPointerAttributeNumber)
|
||||
{
|
||||
/* TODO: give error messages specific to the system columns we dont support*/
|
||||
|
||||
/* attr_needed is 0-indexed but columnList is 1-indexed */
|
||||
AttrNumber varattno = tupdesc->attrs[i].attnum;
|
||||
if (bms_is_member(varattno - FirstLowInvalidHeapAttributeNumber, attr_needed))
|
||||
{
|
||||
columnList = lappend_int(columnList, varattno);
|
||||
/*
|
||||
* ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
* errmsg("system column not supported by columnar")));
|
||||
*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return columnList;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -117,8 +117,9 @@ ColumnarBeginWrite(RelFileNode relfilenode,
|
|||
"Stripe Write Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
bool *columnMaskArray = palloc(columnCount * sizeof(bool));
|
||||
memset(columnMaskArray, true, columnCount);
|
||||
Bitmapset *columnMaskArray = NULL;
|
||||
bms_add_range(columnMaskArray, 1 - FirstLowInvalidHeapAttributeNumber,
|
||||
columnCount - FirstLowInvalidHeapAttributeNumber);
|
||||
|
||||
ChunkData *chunkData = CreateEmptyChunkData(columnCount, columnMaskArray,
|
||||
options.chunkRowCount);
|
||||
|
|
|
@ -211,13 +211,13 @@ extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state);
|
|||
/* Function declarations for reading from columnar table */
|
||||
extern ColumnarReadState * ColumnarBeginRead(Relation relation,
|
||||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList,
|
||||
Bitmapset *projectedColumnList,
|
||||
List *qualConditions);
|
||||
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
||||
bool *columnNulls, uint64 *rowNumber);
|
||||
extern void ColumnarRescan(ColumnarReadState *readState);
|
||||
extern bool ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber,
|
||||
List *neededColumnList, Datum *columnValues,
|
||||
Bitmapset *neededColumnList, Datum *columnValues,
|
||||
bool *columnNulls, Snapshot snapshot);
|
||||
extern void ColumnarEndRead(ColumnarReadState *state);
|
||||
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
||||
|
@ -225,7 +225,7 @@ extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
|||
/* Function declarations for common functions */
|
||||
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
||||
int16 procedureId);
|
||||
extern ChunkData * CreateEmptyChunkData(uint32 columnCount, bool *columnMask,
|
||||
extern ChunkData * CreateEmptyChunkData(uint32 columnCount, Bitmapset *columnMask,
|
||||
uint32 chunkGroupRowCount);
|
||||
extern void FreeChunkData(ChunkData *chunkData);
|
||||
extern uint64 ColumnarTableRowCount(Relation relation);
|
||||
|
|
Loading…
Reference in New Issue