mirror of https://github.com/citusdata/citus.git
Merge pull request #5154 from citusdata/col/use-correct-snapshot
commit
0bf29200eb
|
@ -88,7 +88,7 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
|
||||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||||
static StripeMetadata * BuildStripeMetadata(Datum *datumArray);
|
static StripeMetadata * BuildStripeMetadata(Datum *datumArray);
|
||||||
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
|
||||||
chunkGroupCount);
|
chunkGroupCount, Snapshot snapshot);
|
||||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||||
static Oid ColumnarStripeRelationId(void);
|
static Oid ColumnarStripeRelationId(void);
|
||||||
static Oid ColumnarStripePKeyIndexRelationId(void);
|
static Oid ColumnarStripePKeyIndexRelationId(void);
|
||||||
|
@ -484,8 +484,6 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
|
||||||
|
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
table_close(columnarChunk, RowExclusiveLock);
|
table_close(columnarChunk, RowExclusiveLock);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -522,8 +520,6 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||||
|
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
table_close(columnarChunkGroup, NoLock);
|
table_close(columnarChunkGroup, NoLock);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -532,7 +528,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||||
*/
|
*/
|
||||||
StripeSkipList *
|
StripeSkipList *
|
||||||
ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
||||||
uint32 chunkCount)
|
uint32 chunkCount, Snapshot snapshot)
|
||||||
{
|
{
|
||||||
int32 columnIndex = 0;
|
int32 columnIndex = 0;
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
|
@ -550,8 +546,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
||||||
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
|
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, NULL,
|
SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
|
||||||
2, scanKey);
|
snapshot, 2, scanKey);
|
||||||
|
|
||||||
StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList));
|
StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList));
|
||||||
chunkList->chunkCount = chunkCount;
|
chunkList->chunkCount = chunkCount;
|
||||||
|
@ -634,7 +630,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
||||||
table_close(columnarChunk, AccessShareLock);
|
table_close(columnarChunk, AccessShareLock);
|
||||||
|
|
||||||
chunkList->chunkGroupRowCounts =
|
chunkList->chunkGroupRowCounts =
|
||||||
ReadChunkGroupRowCounts(storageId, stripe, chunkCount);
|
ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot);
|
||||||
|
|
||||||
return chunkList;
|
return chunkList;
|
||||||
}
|
}
|
||||||
|
@ -803,7 +799,8 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
|
||||||
* given stripe.
|
* given stripe.
|
||||||
*/
|
*/
|
||||||
static uint32 *
|
static uint32 *
|
||||||
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
|
||||||
|
Snapshot snapshot)
|
||||||
{
|
{
|
||||||
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
|
||||||
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
|
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
|
||||||
|
@ -816,7 +813,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount)
|
||||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||||
|
|
||||||
SysScanDesc scanDescriptor =
|
SysScanDesc scanDescriptor =
|
||||||
systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey);
|
systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey);
|
||||||
|
|
||||||
uint32 chunkGroupIndex = 0;
|
uint32 chunkGroupIndex = 0;
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
|
@ -886,8 +883,6 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
|
||||||
|
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
|
|
||||||
table_close(columnarStripes, RowExclusiveLock);
|
table_close(columnarStripes, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1205,6 +1200,8 @@ FinishModifyRelation(ModifyState *state)
|
||||||
ExecCleanUpTriggerState(state->estate);
|
ExecCleanUpTriggerState(state->estate);
|
||||||
ExecResetTupleTable(state->estate->es_tupleTable, false);
|
ExecResetTupleTable(state->estate->es_tupleTable, false);
|
||||||
FreeExecutorState(state->estate);
|
FreeExecutorState(state->estate);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,9 @@ struct ColumnarReadState
|
||||||
* itself.
|
* itself.
|
||||||
*/
|
*/
|
||||||
MemoryContext scanContext;
|
MemoryContext scanContext;
|
||||||
|
|
||||||
|
Snapshot snapshot;
|
||||||
|
bool snapshotRegisteredByUs;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* static function declarations */
|
/* static function declarations */
|
||||||
|
@ -109,7 +112,8 @@ static bool HasUnreadStripe(ColumnarReadState *readState);
|
||||||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||||
TupleDesc tupleDesc, List *projectedColumnList,
|
TupleDesc tupleDesc, List *projectedColumnList,
|
||||||
List *whereClauseList, List *whereClauseVars,
|
List *whereClauseList, List *whereClauseVars,
|
||||||
MemoryContext stripeReadContext);
|
MemoryContext stripeReadContext,
|
||||||
|
Snapshot snapshot);
|
||||||
static void AdvanceStripeRead(ColumnarReadState *readState);
|
static void AdvanceStripeRead(ColumnarReadState *readState);
|
||||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
|
@ -128,7 +132,8 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||||
List *projectedColumnList,
|
List *projectedColumnList,
|
||||||
List *whereClauseList,
|
List *whereClauseList,
|
||||||
List *whereClauseVars,
|
List *whereClauseVars,
|
||||||
int64 *chunkGroupsFiltered);
|
int64 *chunkGroupsFiltered,
|
||||||
|
Snapshot snapshot);
|
||||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||||
ColumnChunkSkipNode *chunkSkipNodeArray,
|
ColumnChunkSkipNode *chunkSkipNodeArray,
|
||||||
uint32 chunkCount, uint64 stripeOffset,
|
uint32 chunkCount, uint64 stripeOffset,
|
||||||
|
@ -167,7 +172,8 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
|
||||||
ColumnarReadState *
|
ColumnarReadState *
|
||||||
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList, List *whereClauseList,
|
List *projectedColumnList, List *whereClauseList,
|
||||||
MemoryContext scanContext)
|
MemoryContext scanContext, Snapshot snapshot,
|
||||||
|
bool snapshotRegisteredByUs)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||||
|
@ -187,8 +193,10 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
readState->stripeReadState = NULL;
|
readState->stripeReadState = NULL;
|
||||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(relation,
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(relation,
|
||||||
COLUMNAR_INVALID_ROW_NUMBER,
|
COLUMNAR_INVALID_ROW_NUMBER,
|
||||||
GetTransactionSnapshot());
|
snapshot);
|
||||||
readState->scanContext = scanContext;
|
readState->scanContext = scanContext;
|
||||||
|
readState->snapshot = snapshot;
|
||||||
|
readState->snapshotRegisteredByUs = snapshotRegisteredByUs;
|
||||||
|
|
||||||
return readState;
|
return readState;
|
||||||
}
|
}
|
||||||
|
@ -230,7 +238,8 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
readState->projectedColumnList,
|
readState->projectedColumnList,
|
||||||
readState->whereClauseList,
|
readState->whereClauseList,
|
||||||
readState->whereClauseVars,
|
readState->whereClauseVars,
|
||||||
readState->stripeReadContext);
|
readState->stripeReadContext,
|
||||||
|
readState->snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
|
||||||
|
@ -260,11 +269,12 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
bool
|
bool
|
||||||
ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||||
uint64 rowNumber, Datum *columnValues,
|
uint64 rowNumber, Datum *columnValues,
|
||||||
bool *columnNulls, Snapshot snapshot)
|
bool *columnNulls)
|
||||||
{
|
{
|
||||||
if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
|
if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
|
||||||
{
|
{
|
||||||
Relation columnarRelation = readState->relation;
|
Relation columnarRelation = readState->relation;
|
||||||
|
Snapshot snapshot = readState->snapshot;
|
||||||
StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
|
StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
|
||||||
rowNumber, snapshot);
|
rowNumber, snapshot);
|
||||||
if (stripeMetadata == NULL)
|
if (stripeMetadata == NULL)
|
||||||
|
@ -286,7 +296,8 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||||
readState->projectedColumnList,
|
readState->projectedColumnList,
|
||||||
whereClauseList,
|
whereClauseList,
|
||||||
whereClauseVars,
|
whereClauseVars,
|
||||||
stripeReadContext);
|
stripeReadContext,
|
||||||
|
snapshot);
|
||||||
|
|
||||||
readState->currentStripeMetadata = stripeMetadata;
|
readState->currentStripeMetadata = stripeMetadata;
|
||||||
}
|
}
|
||||||
|
@ -446,7 +457,7 @@ ColumnarRescan(ColumnarReadState *readState)
|
||||||
ColumnarResetRead(readState);
|
ColumnarResetRead(readState);
|
||||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||||
COLUMNAR_INVALID_ROW_NUMBER,
|
COLUMNAR_INVALID_ROW_NUMBER,
|
||||||
GetTransactionSnapshot());
|
readState->snapshot);
|
||||||
readState->chunkGroupsFiltered = 0;
|
readState->chunkGroupsFiltered = 0;
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
@ -459,6 +470,15 @@ ColumnarRescan(ColumnarReadState *readState)
|
||||||
void
|
void
|
||||||
ColumnarEndRead(ColumnarReadState *readState)
|
ColumnarEndRead(ColumnarReadState *readState)
|
||||||
{
|
{
|
||||||
|
if (readState->snapshotRegisteredByUs)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* init_columnar_read_state created a new snapshot and registered it,
|
||||||
|
* so now forget it.
|
||||||
|
*/
|
||||||
|
UnregisterSnapshot(readState->snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
MemoryContextDelete(readState->stripeReadContext);
|
MemoryContextDelete(readState->stripeReadContext);
|
||||||
if (readState->currentStripeMetadata)
|
if (readState->currentStripeMetadata)
|
||||||
{
|
{
|
||||||
|
@ -493,7 +513,7 @@ ColumnarResetRead(ColumnarReadState *readState)
|
||||||
static StripeReadState *
|
static StripeReadState *
|
||||||
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||||
List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
|
List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
|
||||||
MemoryContext stripeReadContext)
|
MemoryContext stripeReadContext, Snapshot snapshot)
|
||||||
{
|
{
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||||
|
|
||||||
|
@ -513,7 +533,8 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
||||||
whereClauseList,
|
whereClauseList,
|
||||||
whereClauseVars,
|
whereClauseVars,
|
||||||
&stripeReadState->
|
&stripeReadState->
|
||||||
chunkGroupsFiltered);
|
chunkGroupsFiltered,
|
||||||
|
snapshot);
|
||||||
|
|
||||||
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
|
||||||
|
|
||||||
|
@ -540,7 +561,7 @@ AdvanceStripeRead(ColumnarReadState *readState)
|
||||||
StripeGetHighestRowNumber(readState->currentStripeMetadata);
|
StripeGetHighestRowNumber(readState->currentStripeMetadata);
|
||||||
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
|
||||||
lastReadRowNumber,
|
lastReadRowNumber,
|
||||||
GetTransactionSnapshot());
|
readState->snapshot);
|
||||||
readState->stripeReadState = NULL;
|
readState->stripeReadState = NULL;
|
||||||
MemoryContextReset(readState->stripeReadContext);
|
MemoryContextReset(readState->stripeReadContext);
|
||||||
|
|
||||||
|
@ -792,7 +813,7 @@ static StripeBuffers *
|
||||||
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
TupleDesc tupleDescriptor, List *projectedColumnList,
|
TupleDesc tupleDescriptor, List *projectedColumnList,
|
||||||
List *whereClauseList, List *whereClauseVars,
|
List *whereClauseList, List *whereClauseVars,
|
||||||
int64 *chunkGroupsFiltered)
|
int64 *chunkGroupsFiltered, Snapshot snapshot)
|
||||||
{
|
{
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
uint32 columnCount = tupleDescriptor->natts;
|
uint32 columnCount = tupleDescriptor->natts;
|
||||||
|
@ -802,7 +823,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
|
StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
|
||||||
stripeMetadata->id,
|
stripeMetadata->id,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
stripeMetadata->chunkCount);
|
stripeMetadata->chunkCount,
|
||||||
|
snapshot);
|
||||||
|
|
||||||
bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
|
bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
|
||||||
whereClauseVars, chunkGroupsFiltered);
|
whereClauseVars, chunkGroupsFiltered);
|
||||||
|
|
|
@ -227,8 +227,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
|
||||||
"cannot read from table when there is unflushed data in upper transactions");
|
"cannot read from table when there is unflushed data in upper transactions");
|
||||||
}
|
}
|
||||||
|
|
||||||
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
return ((TableScanDesc) scan);
|
return ((TableScanDesc) scan);
|
||||||
|
@ -253,13 +251,56 @@ CreateColumnarScanMemoryContext(void)
|
||||||
*/
|
*/
|
||||||
static ColumnarReadState *
|
static ColumnarReadState *
|
||||||
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
||||||
List *scanQual, MemoryContext scanContext)
|
List *scanQual, MemoryContext scanContext, Snapshot snapshot)
|
||||||
{
|
{
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
||||||
|
|
||||||
|
Oid relfilenode = relation->rd_node.relNode;
|
||||||
|
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
|
||||||
|
|
||||||
|
bool snapshotRegisteredByUs = false;
|
||||||
|
if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we flushed any pending writes, then we should guarantee that
|
||||||
|
* those writes are visible to us too. For this reason, if given
|
||||||
|
* snapshot is an MVCC snapshot, then we set its curcid to current
|
||||||
|
* command id.
|
||||||
|
*
|
||||||
|
* For simplicity, we do that even if we didn't flush any writes
|
||||||
|
* since we don't see any problem with that.
|
||||||
|
*
|
||||||
|
* XXX: We should either not update cid if we are executing a FETCH
|
||||||
|
* (from cursor) command, or we should have a better way to deal with
|
||||||
|
* pending writes, see the discussion in
|
||||||
|
* https://github.com/citusdata/citus/issues/5231.
|
||||||
|
*/
|
||||||
|
PushCopiedSnapshot(snapshot);
|
||||||
|
|
||||||
|
/* now our snapshot is the active one */
|
||||||
|
UpdateActiveSnapshotCommandId();
|
||||||
|
snapshot = GetActiveSnapshot();
|
||||||
|
RegisterSnapshot(snapshot);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To be able to use UpdateActiveSnapshotCommandId, we pushed the
|
||||||
|
* copied snapshot to the stack. However, we don't need to keep it
|
||||||
|
* there since we will anyway rely on ColumnarReadState->snapshot
|
||||||
|
* during read operation.
|
||||||
|
*
|
||||||
|
* Note that since we registered the snapshot already, we guarantee
|
||||||
|
* that PopActiveSnapshot won't free it.
|
||||||
|
*/
|
||||||
|
PopActiveSnapshot();
|
||||||
|
|
||||||
|
/* not forget to unregister it when finishing read operation */
|
||||||
|
snapshotRegisteredByUs = true;
|
||||||
|
}
|
||||||
|
|
||||||
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
|
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
|
||||||
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
|
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
|
||||||
scanQual, scanContext);
|
scanQual, scanContext, snapshot,
|
||||||
|
snapshotRegisteredByUs);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
@ -309,7 +350,7 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
|
||||||
scan->cs_readState =
|
scan->cs_readState =
|
||||||
init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
|
init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
|
||||||
scan->attr_needed, scan->scanQual,
|
scan->attr_needed, scan->scanQual,
|
||||||
scan->scanContext);
|
scan->scanContext, scan->cs_base.rs_snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecClearTuple(slot);
|
ExecClearTuple(slot);
|
||||||
|
@ -429,8 +470,6 @@ columnar_index_fetch_begin(Relation rel)
|
||||||
"upper transactions");
|
"upper transactions");
|
||||||
}
|
}
|
||||||
|
|
||||||
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
|
|
||||||
|
|
||||||
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
||||||
|
|
||||||
|
@ -503,12 +542,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
||||||
scan->cs_readState = init_columnar_read_state(columnarRelation,
|
scan->cs_readState = init_columnar_read_state(columnarRelation,
|
||||||
slot->tts_tupleDescriptor,
|
slot->tts_tupleDescriptor,
|
||||||
attr_needed, scanQual,
|
attr_needed, scanQual,
|
||||||
scan->scanContext);
|
scan->scanContext,
|
||||||
|
snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64 rowNumber = tid_to_row_number(*tid);
|
uint64 rowNumber = tid_to_row_number(*tid);
|
||||||
if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values,
|
if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values,
|
||||||
slot->tts_isnull, snapshot))
|
slot->tts_isnull))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -550,7 +590,9 @@ static bool
|
||||||
columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
||||||
Snapshot snapshot)
|
Snapshot snapshot)
|
||||||
{
|
{
|
||||||
return true;
|
uint64 rowNumber = tid_to_row_number(slot->tts_tid);
|
||||||
|
StripeMetadata *stripeMetadata = FindStripeByRowNumber(rel, rowNumber, snapshot);
|
||||||
|
return stripeMetadata != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -800,10 +842,13 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
||||||
/* no quals for table rewrite */
|
/* no quals for table rewrite */
|
||||||
List *scanQual = NIL;
|
List *scanQual = NIL;
|
||||||
|
|
||||||
|
/* use SnapshotAny when re-writing table as heapAM does */
|
||||||
|
Snapshot snapshot = SnapshotAny;
|
||||||
|
|
||||||
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
||||||
ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc,
|
ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc,
|
||||||
attr_needed, scanQual,
|
attr_needed, scanQual,
|
||||||
scanContext);
|
scanContext, snapshot);
|
||||||
|
|
||||||
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||||
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
||||||
|
@ -911,7 +956,8 @@ LogRelationStats(Relation rel, int elevel)
|
||||||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||||
StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id,
|
StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id,
|
||||||
RelationGetDescr(rel),
|
RelationGetDescr(rel),
|
||||||
stripe->chunkCount);
|
stripe->chunkCount,
|
||||||
|
GetTransactionSnapshot());
|
||||||
for (uint32 column = 0; column < skiplist->columnCount; column++)
|
for (uint32 column = 0; column < skiplist->columnCount; column++)
|
||||||
{
|
{
|
||||||
bool attrDropped = tupdesc->attrs[column].attisdropped;
|
bool attrDropped = tupdesc->attrs[column].attisdropped;
|
||||||
|
|
|
@ -213,13 +213,15 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList,
|
List *projectedColumnList,
|
||||||
List *qualConditions,
|
List *qualConditions,
|
||||||
MemoryContext scanContext);
|
MemoryContext scanContext,
|
||||||
|
Snapshot snaphot,
|
||||||
|
bool snapshotRegisteredByUs);
|
||||||
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
||||||
bool *columnNulls, uint64 *rowNumber);
|
bool *columnNulls, uint64 *rowNumber);
|
||||||
extern void ColumnarRescan(ColumnarReadState *readState);
|
extern void ColumnarRescan(ColumnarReadState *readState);
|
||||||
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
|
||||||
uint64 rowNumber, Datum *columnValues,
|
uint64 rowNumber, Datum *columnValues,
|
||||||
bool *columnNulls, Snapshot snapshot);
|
bool *columnNulls);
|
||||||
extern void ColumnarEndRead(ColumnarReadState *state);
|
extern void ColumnarEndRead(ColumnarReadState *state);
|
||||||
extern void ColumnarResetRead(ColumnarReadState *readState);
|
extern void ColumnarResetRead(ColumnarReadState *readState);
|
||||||
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
||||||
|
@ -255,7 +257,8 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
||||||
List *chunkGroupRowCounts);
|
List *chunkGroupRowCounts);
|
||||||
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
uint32 chunkCount);
|
uint32 chunkCount,
|
||||||
|
Snapshot snapshot);
|
||||||
extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber,
|
extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||||
Snapshot snapshot);
|
Snapshot snapshot);
|
||||||
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
--
|
--
|
||||||
-- Testing insert on columnar tables.
|
-- Testing insert on columnar tables.
|
||||||
--
|
--
|
||||||
|
CREATE SCHEMA columnar_insert;
|
||||||
|
SET search_path TO columnar_insert;
|
||||||
CREATE TABLE test_insert_command (a int) USING columnar;
|
CREATE TABLE test_insert_command (a int) USING columnar;
|
||||||
-- test single row inserts fail
|
-- test single row inserts fail
|
||||||
select count(*) from test_insert_command;
|
select count(*) from test_insert_command;
|
||||||
|
@ -228,4 +230,67 @@ ORDER BY 1,2,3,4;
|
||||||
zero_col | 5 | 0 | 64
|
zero_col | 5 | 0 | 64
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
DROP TABLE zero_col;
|
CREATE TABLE selfinsert(x int) USING columnar;
|
||||||
|
SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000);
|
||||||
|
alter_columnar_table_set
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
INSERT INTO selfconflict VALUES (2,1), (2,2);
|
||||||
|
ERROR: duplicate key value violates unique constraint "selfconflict_pkey"
|
||||||
|
DETAIL: Key (f1)=(2) already exists.
|
||||||
|
COMMIT;
|
||||||
|
SELECT COUNT(*)=0 FROM selfconflict;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE flush_create_index(a int, b int) USING columnar;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO flush_create_index VALUES (5, 10);
|
||||||
|
SET enable_seqscan TO OFF;
|
||||||
|
SET columnar.enable_custom_scan TO OFF;
|
||||||
|
SET enable_indexscan TO ON;
|
||||||
|
CREATE INDEX ON flush_create_index(a);
|
||||||
|
SELECT a FROM flush_create_index WHERE a=5;
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
RESET search_path;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA columnar_insert CASCADE;
|
||||||
|
|
|
@ -202,3 +202,43 @@ stripe_metadata_for_test_insert_concurrency_ok
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin-repeatable:
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
|
||||||
|
step s1-insert:
|
||||||
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||||
|
|
||||||
|
step s2-insert:
|
||||||
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||||
|
|
||||||
|
step s2-select:
|
||||||
|
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||||
|
|
||||||
|
a| b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4| 8
|
||||||
|
5|10
|
||||||
|
6|12
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-select:
|
||||||
|
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||||
|
|
||||||
|
a| b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4| 8
|
||||||
|
5|10
|
||||||
|
6|12
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,11 @@ step "s2-begin"
|
||||||
BEGIN;
|
BEGIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s2-begin-repeatable"
|
||||||
|
{
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-insert"
|
step "s2-insert"
|
||||||
{
|
{
|
||||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||||
|
@ -103,3 +108,5 @@ permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select"
|
||||||
# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has
|
# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has
|
||||||
# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times.
|
# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times.
|
||||||
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata"
|
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata"
|
||||||
|
|
||||||
|
permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit"
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
-- Testing insert on columnar tables.
|
-- Testing insert on columnar tables.
|
||||||
--
|
--
|
||||||
|
|
||||||
|
CREATE SCHEMA columnar_insert;
|
||||||
|
SET search_path TO columnar_insert;
|
||||||
|
|
||||||
CREATE TABLE test_insert_command (a int) USING columnar;
|
CREATE TABLE test_insert_command (a int) USING columnar;
|
||||||
|
|
||||||
-- test single row inserts fail
|
-- test single row inserts fail
|
||||||
|
@ -152,4 +155,51 @@ SELECT relname, stripe_num, chunk_group_num, row_count FROM columnar.chunk_group
|
||||||
WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col'
|
WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col'
|
||||||
ORDER BY 1,2,3,4;
|
ORDER BY 1,2,3,4;
|
||||||
|
|
||||||
DROP TABLE zero_col;
|
CREATE TABLE selfinsert(x int) USING columnar;
|
||||||
|
|
||||||
|
SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
INSERT INTO selfinsert SELECT generate_series(1,1010);
|
||||||
|
INSERT INTO selfinsert SELECT * FROM selfinsert;
|
||||||
|
|
||||||
|
SELECT SUM(x)=1021110 FROM selfinsert;
|
||||||
|
|
||||||
|
CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar;
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
INSERT INTO selfconflict VALUES (2,1), (2,2);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT COUNT(*)=0 FROM selfconflict;
|
||||||
|
|
||||||
|
CREATE TABLE flush_create_index(a int, b int) USING columnar;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO flush_create_index VALUES (5, 10);
|
||||||
|
|
||||||
|
SET enable_seqscan TO OFF;
|
||||||
|
SET columnar.enable_custom_scan TO OFF;
|
||||||
|
SET enable_indexscan TO ON;
|
||||||
|
|
||||||
|
CREATE INDEX ON flush_create_index(a);
|
||||||
|
|
||||||
|
SELECT a FROM flush_create_index WHERE a=5;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
RESET search_path;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA columnar_insert CASCADE;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue