mirror of https://github.com/citusdata/citus.git
Use correct snapshot when reading a columnar table
Instead of using xact snapshot, use the snapshot provided to columnarAM when scanning table.pull/5154/head
parent
bd91df298f
commit
0b4ed075b5
|
@ -88,7 +88,7 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
|
|||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||
static StripeMetadata * BuildStripeMetadata(Datum *datumArray);
|
||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||
chunkGroupCount);
|
||||
chunkGroupCount, Snapshot snapshot);
|
||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||
static Oid ColumnarStripeRelationId(void);
|
||||
static Oid ColumnarStripePKeyIndexRelationId(void);
|
||||
|
@ -532,7 +532,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
|||
*/
|
||||
StripeSkipList *
|
||||
ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
||||
uint32 chunkCount)
|
||||
uint32 chunkCount, Snapshot snapshot)
|
||||
{
|
||||
int32 columnIndex = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
@ -550,8 +550,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
|||
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, NULL,
|
||||
2, scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
|
||||
snapshot, 2, scanKey);
|
||||
|
||||
StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList));
|
||||
chunkList->chunkCount = chunkCount;
|
||||
|
@ -634,7 +634,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
|||
table_close(columnarChunk, AccessShareLock);
|
||||
|
||||
chunkList->chunkGroupRowCounts =
|
||||
ReadChunkGroupRowCounts(storageId, stripe, chunkCount);
|
||||
ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot);
|
||||
|
||||
return chunkList;
|
||||
}
|
||||
|
@ -803,7 +803,8 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
|
|||
* given stripe.
|
||||
*/
|
||||
static uint32 *
|
||||
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
||||
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
|
||||
Snapshot snapshot)
|
||||
{
|
||||
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
||||
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
|
||||
|
@ -816,7 +817,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
|||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey);
|
||||
systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey);
|
||||
|
||||
uint32 chunkGroupIndex = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
|
|
@ -88,6 +88,8 @@ struct ColumnarReadState
|
|||
* itself.
|
||||
*/
|
||||
MemoryContext scanContext;
|
||||
|
||||
Snapshot snapshot;
|
||||
};
|
||||
|
||||
/* static function declarations */
|
||||
|
@ -109,7 +111,8 @@ static bool HasUnreadStripe(ColumnarReadState *readState);
|
|||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||
TupleDesc tupleDesc, List *projectedColumnList,
|
||||
List *whereClauseList, List *whereClauseVars,
|
||||
MemoryContext stripeReadContext);
|
||||
MemoryContext stripeReadContext,
|
||||
Snapshot snapshot);
|
||||
static void AdvanceStripeRead(ColumnarReadState *readState);
|
||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
|
@ -128,7 +131,8 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
|||
List *projectedColumnList,
|
||||
List *whereClauseList,
|
||||
List *whereClauseVars,
|
||||
int64 *chunkGroupsFiltered);
|
||||
int64 *chunkGroupsFiltered,
|
||||
Snapshot snapshot);
|
||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||
ColumnChunkSkipNode *chunkSkipNodeArray,
|
||||
uint32 chunkCount, uint64 stripeOffset,
|
||||
|
@ -167,7 +171,7 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
|||
ColumnarReadState *
|
||||
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList,
|
||||
MemoryContext scanContext)
|
||||
MemoryContext scanContext, Snapshot snapshot)
|
||||
{
|
||||
/*
|
||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||
|
@ -187,8 +191,9 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
readState->stripeReadState = NULL;
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(relation,
|
||||
COLUMNAR_INVALID_ROW_NUMBER,
|
||||
GetTransactionSnapshot());
|
||||
snapshot);
|
||||
readState->scanContext = scanContext;
|
||||
readState->snapshot = snapshot;
|
||||
|
||||
return readState;
|
||||
}
|
||||
|
@ -230,7 +235,8 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
|||
readState->projectedColumnList,
|
||||
readState->whereClauseList,
|
||||
readState->whereClauseVars,
|
||||
readState->stripeReadContext);
|
||||
readState->stripeReadContext,
|
||||
readState->snapshot);
|
||||
}
|
||||
|
||||
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
||||
|
@ -260,11 +266,12 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
|||
bool
|
||||
ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||
uint64 rowNumber, Datum *columnValues,
|
||||
bool *columnNulls, Snapshot snapshot)
|
||||
bool *columnNulls)
|
||||
{
|
||||
if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
|
||||
{
|
||||
Relation columnarRelation = readState->relation;
|
||||
Snapshot snapshot = readState->snapshot;
|
||||
StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
|
||||
rowNumber, snapshot);
|
||||
if (stripeMetadata == NULL)
|
||||
|
@ -286,7 +293,8 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
|||
readState->projectedColumnList,
|
||||
whereClauseList,
|
||||
whereClauseVars,
|
||||
stripeReadContext);
|
||||
stripeReadContext,
|
||||
snapshot);
|
||||
|
||||
readState->currentStripeMetadata = stripeMetadata;
|
||||
}
|
||||
|
@ -446,7 +454,7 @@ ColumnarRescan(ColumnarReadState *readState)
|
|||
ColumnarResetRead(readState);
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||
COLUMNAR_INVALID_ROW_NUMBER,
|
||||
GetTransactionSnapshot());
|
||||
readState->snapshot);
|
||||
readState->chunkGroupsFiltered = 0;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
@ -493,7 +501,7 @@ ColumnarResetRead(ColumnarReadState *readState)
|
|||
static StripeReadState *
|
||||
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
|
||||
MemoryContext stripeReadContext)
|
||||
MemoryContext stripeReadContext, Snapshot snapshot)
|
||||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||
|
||||
|
@ -513,7 +521,8 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
|||
whereClauseList,
|
||||
whereClauseVars,
|
||||
&stripeReadState->
|
||||
chunkGroupsFiltered);
|
||||
chunkGroupsFiltered,
|
||||
snapshot);
|
||||
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
|
||||
|
@ -540,7 +549,7 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
|||
StripeGetHighestRowNumber(readState->currentStripeMetadata);
|
||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||
lastReadRowNumber,
|
||||
GetTransactionSnapshot());
|
||||
readState->snapshot);
|
||||
readState->stripeReadState = NULL;
|
||||
MemoryContextReset(readState->stripeReadContext);
|
||||
|
||||
|
@ -792,7 +801,7 @@ static StripeBuffers *
|
|||
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||
TupleDesc tupleDescriptor, List *projectedColumnList,
|
||||
List *whereClauseList, List *whereClauseVars,
|
||||
int64 *chunkGroupsFiltered)
|
||||
int64 *chunkGroupsFiltered, Snapshot snapshot)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
|
@ -802,7 +811,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
|
||||
stripeMetadata->id,
|
||||
tupleDescriptor,
|
||||
stripeMetadata->chunkCount);
|
||||
stripeMetadata->chunkCount,
|
||||
snapshot);
|
||||
|
||||
bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
|
||||
whereClauseVars, chunkGroupsFiltered);
|
||||
|
|
|
@ -253,13 +253,13 @@ CreateColumnarScanMemoryContext(void)
|
|||
*/
|
||||
static ColumnarReadState *
|
||||
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
||||
List *scanQual, MemoryContext scanContext)
|
||||
List *scanQual, MemoryContext scanContext, Snapshot snapshot)
|
||||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
||||
|
||||
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
|
||||
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
|
||||
scanQual, scanContext);
|
||||
scanQual, scanContext, snapshot);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
@ -309,7 +309,7 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
|
|||
scan->cs_readState =
|
||||
init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
|
||||
scan->attr_needed, scan->scanQual,
|
||||
scan->scanContext);
|
||||
scan->scanContext, scan->cs_base.rs_snapshot);
|
||||
}
|
||||
|
||||
ExecClearTuple(slot);
|
||||
|
@ -503,12 +503,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
scan->cs_readState = init_columnar_read_state(columnarRelation,
|
||||
slot->tts_tupleDescriptor,
|
||||
attr_needed, scanQual,
|
||||
scan->scanContext);
|
||||
scan->scanContext,
|
||||
snapshot);
|
||||
}
|
||||
|
||||
uint64 rowNumber = tid_to_row_number(*tid);
|
||||
if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values,
|
||||
slot->tts_isnull, snapshot))
|
||||
slot->tts_isnull))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -550,7 +551,9 @@ static bool
|
|||
columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
||||
Snapshot snapshot)
|
||||
{
|
||||
return true;
|
||||
uint64 rowNumber = tid_to_row_number(slot->tts_tid);
|
||||
StripeMetadata *stripeMetadata = FindStripeByRowNumber(rel, rowNumber, snapshot);
|
||||
return stripeMetadata != NULL;
|
||||
}
|
||||
|
||||
|
||||
|
@ -800,10 +803,13 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
/* no quals for table rewrite */
|
||||
List *scanQual = NIL;
|
||||
|
||||
/* use SnapshotAny when re-writing table as heapAM does */
|
||||
Snapshot snapshot = SnapshotAny;
|
||||
|
||||
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
||||
ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc,
|
||||
attr_needed, scanQual,
|
||||
scanContext);
|
||||
scanContext, snapshot);
|
||||
|
||||
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
||||
|
@ -911,7 +917,8 @@ LogRelationStats(Relation rel, int elevel)
|
|||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||
StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id,
|
||||
RelationGetDescr(rel),
|
||||
stripe->chunkCount);
|
||||
stripe->chunkCount,
|
||||
GetTransactionSnapshot());
|
||||
for (uint32 column = 0; column < skiplist->columnCount; column++)
|
||||
{
|
||||
bool attrDropped = tupdesc->attrs[column].attisdropped;
|
||||
|
|
|
@ -213,13 +213,14 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation,
|
|||
TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList,
|
||||
List *qualConditions,
|
||||
MemoryContext scanContext);
|
||||
MemoryContext scanContext,
|
||||
Snapshot snaphot);
|
||||
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
||||
bool *columnNulls, uint64 *rowNumber);
|
||||
extern void ColumnarRescan(ColumnarReadState *readState);
|
||||
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||
uint64 rowNumber, Datum *columnValues,
|
||||
bool *columnNulls, Snapshot snapshot);
|
||||
bool *columnNulls);
|
||||
extern void ColumnarEndRead(ColumnarReadState *state);
|
||||
extern void ColumnarResetRead(ColumnarReadState *readState);
|
||||
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
||||
|
@ -255,7 +256,8 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
|||
List *chunkGroupRowCounts);
|
||||
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||
TupleDesc tupleDescriptor,
|
||||
uint32 chunkCount);
|
||||
uint32 chunkCount,
|
||||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||
Snapshot snapshot);
|
||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||
|
|
Loading…
Reference in New Issue