mirror of https://github.com/citusdata/citus.git
Teach columnar index scan to read local write state, without flushing
This removes the last case that we need to flush pending writes on master. Currently, when index scan encounters with a tid that belongs to an unflushed write of the current backend, then we first flush pending writes to be able to read the tuple via usual code-path. With the changes made with this commit, we don't flush pending writes of the current backend to read such a tuple. Instead, we directly read the tuple from local write state of the current backend. However, exclusion constraints always need to read the tuple during index scan to detect a constraint violation. This is not a problem on master before the changes made in this commit. This is because, on master, exclusion constraints always flush single tuple writes since postgres calls index_fetch_tuple for each tuple inserted. That way, concurrent writers can easily access unflushed tuples of other backends for constraint violation checks. For this reason, now that this commit removes that "flushing-on-read" logic from index scan, CI fails for some exclusion constraint tests.pull/5340/head
parent
7bd746b91a
commit
decfa4bca2
|
@ -96,6 +96,8 @@ struct ColumnarReadState
|
|||
|
||||
Snapshot snapshot;
|
||||
bool snapshotRegisteredByUs;
|
||||
|
||||
ColumnarWriteState *cachedWriteState;
|
||||
};
|
||||
|
||||
/* static function declarations */
|
||||
|
@ -119,6 +121,12 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio
|
|||
List *whereClauseList, List *whereClauseVars,
|
||||
MemoryContext stripeReadContext,
|
||||
Snapshot snapshot);
|
||||
static StripeReadState * BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||
TupleDesc tupleDesc, List *projectedColumnList,
|
||||
MemoryContext stripeReadContext);
|
||||
static StripeReadState * makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList,
|
||||
MemoryContext stripeReadContext);
|
||||
static void AdvanceStripeRead(ColumnarReadState *readState);
|
||||
static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot);
|
||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||
|
@ -440,6 +448,108 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
|||
}
|
||||
|
||||
|
||||
struct ColumnarWriteState
|
||||
{
|
||||
TupleDesc tupleDescriptor;
|
||||
FmgrInfo **comparisonFunctionArray;
|
||||
RelFileNode relfilenode;
|
||||
|
||||
MemoryContext stripeWriteContext;
|
||||
MemoryContext perTupleContext;
|
||||
StripeBuffers *stripeBuffers;
|
||||
StripeSkipList *stripeSkipList;
|
||||
EmptyStripeReservation *emptyStripeReservation;
|
||||
ColumnarOptions options;
|
||||
ChunkData *chunkData;
|
||||
|
||||
List *chunkGroupRowCounts;
|
||||
|
||||
/*
|
||||
* compressionBuffer buffer is used as temporary storage during
|
||||
* data value compression operation. It is kept here to minimize
|
||||
* memory allocations. It lives in stripeWriteContext and gets
|
||||
* deallocated when memory context is reset.
|
||||
*/
|
||||
StringInfo compressionBuffer;
|
||||
};
|
||||
|
||||
void
|
||||
ColumnarReadBufferByRowNumber(ColumnarReadState *readState,
|
||||
uint64 rowNumber, Datum *columnValues,
|
||||
bool *columnNulls)
|
||||
{
|
||||
Relation columnarRelation = readState->relation;
|
||||
|
||||
bool stripeCached =
|
||||
readState->cachedWriteState &&
|
||||
rowNumber >= readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber &&
|
||||
rowNumber < readState->cachedWriteState->emptyStripeReservation->stripeFirstRowNumber +
|
||||
readState->cachedWriteState->stripeBuffers->rowCount;
|
||||
Assert(!stripeCached || (readState->currentStripeMetadata &&
|
||||
readState->currentStripeMetadata->id ==
|
||||
readState->cachedWriteState->emptyStripeReservation->stripeId));
|
||||
|
||||
if (!stripeCached)
|
||||
{
|
||||
readState->currentStripeMetadata = FindStripeWithMatchingFirstRowNumber(columnarRelation,
|
||||
rowNumber,
|
||||
readState->snapshot);
|
||||
if (!readState->currentStripeMetadata)
|
||||
{
|
||||
ereport(ERROR, (errmsg("not found 3")));
|
||||
}
|
||||
|
||||
|
||||
readState->cachedWriteState = FindWriteStateByStripeId(columnarRelation->rd_node.relNode,
|
||||
readState->currentStripeMetadata->id);
|
||||
if (!readState->cachedWriteState)
|
||||
{
|
||||
ereport(ERROR, (errmsg("not found 4")));
|
||||
}
|
||||
|
||||
TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
|
||||
readState->stripeReadState = BeginBufferRead(readState->currentStripeMetadata,
|
||||
columnarRelation,
|
||||
relationTupleDesc,
|
||||
readState->projectedColumnList,
|
||||
readState->stripeReadContext);
|
||||
}
|
||||
|
||||
uint64 rowOffset = rowNumber - readState->currentStripeMetadata->firstRowNumber;
|
||||
uint64 serializedRowCount = ColumnarWriteSerializedRowCount(readState->cachedWriteState);
|
||||
if (rowOffset < serializedRowCount)
|
||||
{
|
||||
ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
|
||||
}
|
||||
else
|
||||
{
|
||||
ChunkData *chunkData = ColumnarWriteChunkData(readState->cachedWriteState);
|
||||
uint64 chunkDataOffset = rowOffset - serializedRowCount + 1;
|
||||
for (uint32 columnIndex = 0; columnIndex < chunkData->columnCount; columnIndex++)
|
||||
{
|
||||
if (chunkData->existsArray[columnIndex][chunkDataOffset])
|
||||
{
|
||||
columnValues[columnIndex] = chunkData->valueArray[columnIndex][chunkDataOffset];
|
||||
columnNulls[columnIndex] = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
columnNulls[columnIndex] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Snapshot
|
||||
ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot)
|
||||
{
|
||||
Snapshot oldSnapshot = readState->snapshot;
|
||||
readState->snapshot = snapshot;
|
||||
return oldSnapshot;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnarReadIsCurrentStripe returns true if stripe being read contains
|
||||
* row with given rowNumber.
|
||||
|
@ -651,14 +761,9 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
|||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||
|
||||
StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState));
|
||||
|
||||
stripeReadState->relation = rel;
|
||||
stripeReadState->tupleDescriptor = tupleDesc;
|
||||
stripeReadState->columnCount = tupleDesc->natts;
|
||||
stripeReadState->chunkGroupReadState = NULL;
|
||||
stripeReadState->projectedColumnList = projectedColumnList;
|
||||
stripeReadState->stripeReadContext = stripeReadContext;
|
||||
StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc,
|
||||
projectedColumnList,
|
||||
stripeReadContext);
|
||||
|
||||
stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
|
||||
stripeMetadata,
|
||||
|
@ -669,11 +774,50 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
|||
&stripeReadState->
|
||||
chunkGroupsFiltered,
|
||||
snapshot);
|
||||
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return stripeReadState;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static StripeReadState *
|
||||
BeginBufferRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList, MemoryContext stripeReadContext)
|
||||
{
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||
|
||||
StripeReadState *stripeReadState = makeBasicStripeReadState(rel, tupleDesc,
|
||||
projectedColumnList,
|
||||
stripeReadContext);
|
||||
|
||||
stripeReadState->stripeBuffers =
|
||||
ColumnarWriteStripeBuffers(FindWriteStateByStripeId(rel->rd_node.relNode,
|
||||
stripeMetadata->id));
|
||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return stripeReadState;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static StripeReadState *
|
||||
makeBasicStripeReadState(Relation rel, TupleDesc tupleDesc,
|
||||
List *projectedColumnList,
|
||||
MemoryContext stripeReadContext)
|
||||
{
|
||||
StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState));
|
||||
|
||||
stripeReadState->relation = rel;
|
||||
stripeReadState->tupleDescriptor = tupleDesc;
|
||||
stripeReadState->columnCount = tupleDesc->natts;
|
||||
stripeReadState->chunkGroupReadState = NULL;
|
||||
stripeReadState->projectedColumnList = projectedColumnList;
|
||||
stripeReadState->stripeReadContext = stripeReadContext;
|
||||
|
||||
return stripeReadState;
|
||||
}
|
||||
|
@ -924,7 +1068,7 @@ CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowC
|
|||
chunkData->valueArray = palloc0(columnCount * sizeof(Datum *));
|
||||
chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo));
|
||||
chunkData->columnCount = columnCount;
|
||||
chunkData->rowCount = chunkGroupRowCount;
|
||||
chunkData->rowCount = 0;
|
||||
|
||||
/* allocate chunk memory for deserialized data */
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
|
|
|
@ -545,26 +545,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|||
{
|
||||
if (stripeMetadata->insertedByCurrentXact)
|
||||
{
|
||||
/*
|
||||
* Stripe write is in progress and its entry is inserted by current
|
||||
* transaction, so obviously it must be written by me. Since caller
|
||||
* might want to use tupleslot datums for some reason, do another
|
||||
* look-up, but this time by first flushing our writes.
|
||||
*
|
||||
* XXX: For index scan, this is the only case that we flush pending
|
||||
* writes of the current backend. If we have taught reader how to
|
||||
* read from WriteStateMap. then we could guarantee that
|
||||
* index_fetch_tuple would never flush pending writes, but this seem
|
||||
* to be too much work for now, but should be doable.
|
||||
*/
|
||||
ColumnarReadFlushPendingWrites(scan->cs_readState);
|
||||
|
||||
/*
|
||||
* Fill the tupleslot and fall through to return true, it
|
||||
* certainly exists.
|
||||
*/
|
||||
ColumnarReadRowByRowNumberOrError(scan->cs_readState, rowNumber,
|
||||
Snapshot oldSnapshot = ColumnarReadSetSnapshot(scan->cs_readState, SnapshotSelf);
|
||||
ColumnarReadBufferByRowNumber(scan->cs_readState, rowNumber,
|
||||
slot->tts_values, slot->tts_isnull);
|
||||
ColumnarReadSetSnapshot(scan->cs_readState, oldSnapshot);
|
||||
|
||||
/* make sure to copy slot out of writer's memory context */
|
||||
slot->tts_ops->materialize(slot);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -217,6 +217,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
char columnTypeAlign = attributeForm->attalign;
|
||||
|
||||
chunkData->existsArray[columnIndex][chunkRowIndex] = true;
|
||||
chunkData->valueArray[columnIndex][chunkRowIndex] = columnValues[columnIndex];
|
||||
|
||||
SerializeSingleDatum(chunkData->valueBufferArray[columnIndex],
|
||||
columnValues[columnIndex], columnTypeByValue,
|
||||
|
@ -230,6 +231,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
chunkSkipNode->rowCount++;
|
||||
}
|
||||
|
||||
chunkData->rowCount++;
|
||||
|
||||
stripeSkipList->chunkCount = chunkIndex + 1;
|
||||
|
||||
/* last row of the chunk is inserted serialize the chunk */
|
||||
|
@ -241,6 +244,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
uint64 writtenRowNumber = writeState->emptyStripeReservation->stripeFirstRowNumber +
|
||||
stripeBuffers->rowCount;
|
||||
stripeBuffers->rowCount++;
|
||||
stripeBuffers->selectedChunkGroupRowCounts[chunkIndex]++;
|
||||
if (stripeBuffers->rowCount >= options->stripeRowCount)
|
||||
{
|
||||
ColumnarFlushPendingWrites(writeState);
|
||||
|
@ -300,6 +304,33 @@ ColumnarWritePerTupleContext(ColumnarWriteState *state)
|
|||
}
|
||||
|
||||
|
||||
uint64
|
||||
ColumnarWriteStripeId(ColumnarWriteState *writeState)
|
||||
{
|
||||
return writeState->emptyStripeReservation->stripeId;
|
||||
}
|
||||
|
||||
|
||||
StripeBuffers *
|
||||
ColumnarWriteStripeBuffers(ColumnarWriteState *writeState)
|
||||
{
|
||||
return writeState->stripeBuffers;
|
||||
}
|
||||
|
||||
|
||||
uint64
|
||||
ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState)
|
||||
{
|
||||
return writeState->stripeBuffers->rowCount - writeState->chunkData->rowCount;
|
||||
}
|
||||
|
||||
|
||||
ChunkData *
|
||||
ColumnarWriteChunkData(ColumnarWriteState *writeState)
|
||||
{
|
||||
return writeState->chunkData;
|
||||
}
|
||||
|
||||
/*
|
||||
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
|
||||
* column count.
|
||||
|
@ -335,6 +366,9 @@ CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 chunkRowCount,
|
|||
stripeBuffers->columnCount = columnCount;
|
||||
stripeBuffers->rowCount = 0;
|
||||
|
||||
stripeBuffers->selectedChunkGroupRowCounts =
|
||||
palloc0((1 + ((stripeMaxRowCount - 1) / chunkRowCount)) * sizeof(uint32));
|
||||
|
||||
return stripeBuffers;
|
||||
}
|
||||
|
||||
|
@ -632,6 +666,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row
|
|||
|
||||
/* valueBuffer needs to be reset for next chunk's data */
|
||||
resetStringInfo(chunkData->valueBufferArray[columnIndex]);
|
||||
chunkData->rowCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -196,6 +196,35 @@ columnar_init_write_state(Relation relation, TupleDesc tupdesc,
|
|||
}
|
||||
|
||||
|
||||
|
||||
ColumnarWriteState *
|
||||
FindWriteStateByStripeId(Oid relNode, uint64 stripeId)
|
||||
{
|
||||
bool found = false;
|
||||
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relNode,
|
||||
HASH_FIND, &found);
|
||||
if (!found)
|
||||
{
|
||||
ereport(ERROR, (errmsg("not found 1")));
|
||||
}
|
||||
|
||||
SubXidWriteState *stackHead = hashEntry->writeStateStack;
|
||||
while (stackHead)
|
||||
{
|
||||
ColumnarWriteState *writeState = stackHead->writeState;
|
||||
if (stripeId != ColumnarWriteStripeId(writeState))
|
||||
{
|
||||
stackHead = stackHead->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
return writeState;
|
||||
}
|
||||
|
||||
ereport(ERROR, (errmsg("not found 2")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Flushes pending writes for given relfilenode in the given subtransaction.
|
||||
*/
|
||||
|
|
|
@ -228,6 +228,10 @@ extern void ColumnarFlushPendingWrites(ColumnarWriteState *state);
|
|||
extern void ColumnarEndWrite(ColumnarWriteState *state);
|
||||
extern bool ContainsPendingWrites(ColumnarWriteState *state);
|
||||
extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state);
|
||||
extern uint64 ColumnarWriteStripeId(ColumnarWriteState *writeState);
|
||||
extern StripeBuffers * ColumnarWriteStripeBuffers(ColumnarWriteState *writeState);
|
||||
extern uint64 ColumnarWriteSerializedRowCount(ColumnarWriteState *writeState);
|
||||
extern ChunkData *ColumnarWriteChunkData(ColumnarWriteState *writeState);
|
||||
|
||||
/* Function declarations for reading from columnar table */
|
||||
|
||||
|
@ -256,6 +260,10 @@ extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
|
|||
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||
uint64 rowNumber, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
extern void ColumnarReadBufferByRowNumber(ColumnarReadState *readState,
|
||||
uint64 rowNumber, Datum *columnValues,
|
||||
bool *columnNulls);
|
||||
extern Snapshot ColumnarReadSetSnapshot(ColumnarReadState *readState, Snapshot snapshot);
|
||||
|
||||
/* Function declarations for common functions */
|
||||
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
||||
|
@ -310,6 +318,7 @@ extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
|||
extern ColumnarWriteState * columnar_init_write_state(Relation relation, TupleDesc
|
||||
tupdesc,
|
||||
SubTransactionId currentSubXid);
|
||||
extern ColumnarWriteState * FindWriteStateByStripeId(Oid relNode, uint64 stripeId);
|
||||
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId
|
||||
currentSubXid);
|
||||
extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
|
||||
|
|
Loading…
Reference in New Issue