From 0b4ed075b542b17fd9fafed28bf5541651efb63e Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 29 Jul 2021 17:52:13 +0300 Subject: [PATCH 1/4] Use correct snapshot when reading a columnar table Instead of using xact snapshot, use the snapshot provided to columnarAM when scanning table. --- src/backend/columnar/columnar_metadata.c | 15 +++++----- src/backend/columnar/columnar_reader.c | 36 +++++++++++++++--------- src/backend/columnar/columnar_tableam.c | 23 +++++++++------ src/include/columnar/columnar.h | 8 ++++-- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 1150dc1ac..9d0dc2658 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -88,7 +88,7 @@ static void GetHighestUsedAddressAndId(uint64 storageId, static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); static StripeMetadata * BuildStripeMetadata(Datum *datumArray); static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 - chunkGroupCount); + chunkGroupCount, Snapshot snapshot); static Oid ColumnarStorageIdSequenceRelationId(void); static Oid ColumnarStripeRelationId(void); static Oid ColumnarStripePKeyIndexRelationId(void); @@ -532,7 +532,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, */ StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor, - uint32 chunkCount) + uint32 chunkCount, Snapshot snapshot) { int32 columnIndex = 0; HeapTuple heapTuple = NULL; @@ -550,8 +550,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); - SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, NULL, - 2, scanKey); + SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, + snapshot, 2, scanKey); StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList)); chunkList->chunkCount = chunkCount; @@ -634,7 +634,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri table_close(columnarChunk, AccessShareLock); chunkList->chunkGroupRowCounts = - ReadChunkGroupRowCounts(storageId, stripe, chunkCount); + ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot); return chunkList; } @@ -803,7 +803,8 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot) * given stripe. */ static uint32 * -ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount) +ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount, + Snapshot snapshot) { Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId(); Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock); @@ -816,7 +817,7 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount) BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); SysScanDesc scanDescriptor = - systable_beginscan_ordered(columnarChunkGroup, index, NULL, 2, scanKey); + systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey); uint32 chunkGroupIndex = 0; HeapTuple heapTuple = NULL; diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 96efdbffc..87b4131ce 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -88,6 +88,8 @@ struct ColumnarReadState * itself. */ MemoryContext scanContext; + + Snapshot snapshot; }; /* static function declarations */ @@ -109,7 +111,8 @@ static bool HasUnreadStripe(ColumnarReadState *readState); static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - MemoryContext stripeReadContext); + MemoryContext stripeReadContext, + Snapshot snapshot); static void AdvanceStripeRead(ColumnarReadState *readState); static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, bool *columnNulls); @@ -128,7 +131,8 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - int64 *chunkGroupsFiltered); + int64 *chunkGroupsFiltered, + Snapshot snapshot); static ColumnBuffers * LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray, uint32 chunkCount, uint64 stripeOffset, @@ -167,7 +171,7 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, - MemoryContext scanContext) + MemoryContext scanContext, Snapshot snapshot) { /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -187,8 +191,9 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, readState->stripeReadState = NULL; readState->currentStripeMetadata = FindNextStripeByRowNumber(relation, COLUMNAR_INVALID_ROW_NUMBER, - GetTransactionSnapshot()); + snapshot); readState->scanContext = scanContext; + readState->snapshot = snapshot; return readState; } @@ -230,7 +235,8 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col readState->projectedColumnList, readState->whereClauseList, readState->whereClauseVars, - readState->stripeReadContext); + readState->stripeReadContext, + readState->snapshot); } if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls)) @@ -260,11 +266,12 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, - bool *columnNulls, Snapshot snapshot) + bool *columnNulls) { if (!ColumnarReadIsCurrentStripe(readState, rowNumber)) { Relation columnarRelation = readState->relation; + Snapshot snapshot = readState->snapshot; StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation, rowNumber, snapshot); if (stripeMetadata == NULL) @@ -286,7 +293,8 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState, readState->projectedColumnList, whereClauseList, whereClauseVars, - stripeReadContext); + stripeReadContext, + snapshot); readState->currentStripeMetadata = stripeMetadata; } @@ -446,7 +454,7 @@ ColumnarRescan(ColumnarReadState *readState) ColumnarResetRead(readState); readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, COLUMNAR_INVALID_ROW_NUMBER, - GetTransactionSnapshot()); + readState->snapshot); readState->chunkGroupsFiltered = 0; MemoryContextSwitchTo(oldContext); @@ -493,7 +501,7 @@ ColumnarResetRead(ColumnarReadState *readState) static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - MemoryContext stripeReadContext) + MemoryContext stripeReadContext, Snapshot snapshot) { MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext); @@ -513,7 +521,8 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes whereClauseList, whereClauseVars, &stripeReadState-> - chunkGroupsFiltered); + chunkGroupsFiltered, + snapshot); stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount; @@ -540,7 +549,7 @@ AdvanceStripeRead(ColumnarReadState *readState) StripeGetHighestRowNumber(readState->currentStripeMetadata); readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation, lastReadRowNumber, - GetTransactionSnapshot()); + readState->snapshot); readState->stripeReadState = NULL; MemoryContextReset(readState->stripeReadContext); @@ -792,7 +801,7 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, List *whereClauseVars, - int64 *chunkGroupsFiltered) + int64 *chunkGroupsFiltered, Snapshot snapshot) { uint32 columnIndex = 0; uint32 columnCount = tupleDescriptor->natts; @@ -802,7 +811,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata, StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node, stripeMetadata->id, tupleDescriptor, - stripeMetadata->chunkCount); + stripeMetadata->chunkCount, + snapshot); bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList, whereClauseVars, chunkGroupsFiltered); diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 6630d0430..ea8c6df1e 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -253,13 +253,13 @@ CreateColumnarScanMemoryContext(void) */ static ColumnarReadState * init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed, - List *scanQual, MemoryContext scanContext) + List *scanQual, MemoryContext scanContext, Snapshot snapshot) { MemoryContext oldContext = MemoryContextSwitchTo(scanContext); List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, - scanQual, scanContext); + scanQual, scanContext, snapshot); MemoryContextSwitchTo(oldContext); @@ -309,7 +309,7 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo scan->cs_readState = init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor, scan->attr_needed, scan->scanQual, - scan->scanContext); + scan->scanContext, scan->cs_base.rs_snapshot); } ExecClearTuple(slot); @@ -503,12 +503,13 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan, scan->cs_readState = init_columnar_read_state(columnarRelation, slot->tts_tupleDescriptor, attr_needed, scanQual, - scan->scanContext); + scan->scanContext, + snapshot); } uint64 rowNumber = tid_to_row_number(*tid); if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values, - slot->tts_isnull, snapshot)) + slot->tts_isnull)) { return false; } @@ -550,7 +551,9 @@ static bool columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) { - return true; + uint64 rowNumber = tid_to_row_number(slot->tts_tid); + StripeMetadata *stripeMetadata = FindStripeByRowNumber(rel, rowNumber, snapshot); + return stripeMetadata != NULL; } @@ -800,10 +803,13 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* no quals for table rewrite */ List *scanQual = NIL; + /* use SnapshotAny when re-writing table as heapAM does */ + Snapshot snapshot = SnapshotAny; + MemoryContext scanContext = CreateColumnarScanMemoryContext(); ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc, attr_needed, scanQual, - scanContext); + scanContext, snapshot); Datum *values = palloc0(sourceDesc->natts * sizeof(Datum)); bool *nulls = palloc0(sourceDesc->natts * sizeof(bool)); @@ -911,7 +917,8 @@ LogRelationStats(Relation rel, int elevel) StripeMetadata *stripe = lfirst(stripeMetadataCell); StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id, RelationGetDescr(rel), - stripe->chunkCount); + stripe->chunkCount, + GetTransactionSnapshot()); for (uint32 column = 0; column < skiplist->columnCount; column++) { bool attrDropped = tupdesc->attrs[column].attisdropped; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index a44072ffe..6ca7450ac 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -213,13 +213,14 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *qualConditions, - MemoryContext scanContext); + MemoryContext scanContext, + Snapshot snaphot); extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues, bool *columnNulls, uint64 *rowNumber); extern void ColumnarRescan(ColumnarReadState *readState); extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState, uint64 rowNumber, Datum *columnValues, - bool *columnNulls, Snapshot snapshot); + bool *columnNulls); extern void ColumnarEndRead(ColumnarReadState *state); extern void ColumnarResetRead(ColumnarReadState *readState); extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state); @@ -255,7 +256,8 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, List *chunkGroupRowCounts); extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor, - uint32 chunkCount); + uint32 chunkCount, + Snapshot snapshot); extern StripeMetadata * FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot); extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber, From db0e4ce889366802d4aab2c58015eaf929dd3f82 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 29 Jul 2021 16:13:18 +0300 Subject: [PATCH 2/4] Increment command counter in FinishModifyRelation instead Seems that we always increment the command counter right after finishing metadata table modification. For this reason, it makes sense to call CommandCounterIncrement within FinishModifyRelation. --- src/backend/columnar/columnar_metadata.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 9d0dc2658..79c7946d2 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -484,8 +484,6 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk FinishModifyRelation(modifyState); table_close(columnarChunk, RowExclusiveLock); - - CommandCounterIncrement(); } @@ -522,8 +520,6 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, FinishModifyRelation(modifyState); table_close(columnarChunkGroup, NoLock); - - CommandCounterIncrement(); } @@ -887,8 +883,6 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe) FinishModifyRelation(modifyState); - CommandCounterIncrement(); - table_close(columnarStripes, RowExclusiveLock); } @@ -1206,6 +1200,8 @@ FinishModifyRelation(ModifyState *state) ExecCleanUpTriggerState(state->estate); ExecResetTupleTable(state->estate->es_tupleTable, false); FreeExecutorState(state->estate); + + CommandCounterIncrement(); } From 6c26c67ea09db4e95a93f3eaefea72d20e54b20c Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 29 Jul 2021 19:17:03 +0300 Subject: [PATCH 3/4] Flush write state when initializing read state In next commit, we will adjust curcid of the snapshot being used when scanning the columnar table. However, for index scan, snapshot is provided not when beginning scan but within fetch-tuple call. For this reason, start flushing pending writes in init_columnar_read_state since this seem to be a prerequisite step that needs to be done before scanning a columnar table regardless of the scan method being used. --- src/backend/columnar/columnar_tableam.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index ea8c6df1e..572c74e28 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -227,8 +227,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot, "cannot read from table when there is unflushed data in upper transactions"); } - FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); - MemoryContextSwitchTo(oldContext); return ((TableScanDesc) scan); @@ -257,6 +255,9 @@ init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_n { MemoryContext oldContext = MemoryContextSwitchTo(scanContext); + Oid relfilenode = relation->rd_node.relNode; + FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, scanQual, scanContext, snapshot); @@ -429,8 +430,6 @@ columnar_index_fetch_begin(Relation rel) "upper transactions"); } - FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); - MemoryContext scanContext = CreateColumnarScanMemoryContext(); MemoryContext oldContext = MemoryContextSwitchTo(scanContext); From bf4dfad6f76a591403551de8596fa4392adeca67 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 29 Jul 2021 19:19:09 +0300 Subject: [PATCH 4/4] Update curcid of given snapshot if it is MVCC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before starting to scan a columnar table, we always flush the pending writes to disk. However, we increment command counter after modifying metadata tables. On the other hand, now that we _don't always use_ xact snapshot to scan a columnar table, writes that we just flushed might not be visible to the query that just flushed pending writes to disk since curcid of provided snapshot would become smaller than the command id being used when modifying metadata tables. To give an example, before this change, below was a possible scenario due to the changes that we made to use the correct snapshot. ```sql CREATE TABLE t(a int, b int) USING columnar; BEGIN; INSERT INTO t VALUES (5, 10); SELECT * FROM t; ┌───┬───┐ │ a │ b │ ├───┼───┤ └───┴───┘ (0 rows) SELECT * FROM t; ┌───┬────┐ │ a │ b │ ├───┼────┤ │ 5 │ 10 │ └───┴────┘ (1 row) ``` --- src/backend/columnar/columnar_reader.c | 14 +++- src/backend/columnar/columnar_tableam.c | 42 +++++++++++- src/include/columnar/columnar.h | 3 +- src/test/regress/expected/columnar_insert.out | 67 ++++++++++++++++++- .../expected/columnar_write_concurrency.out | 40 +++++++++++ .../spec/columnar_write_concurrency.spec | 7 ++ src/test/regress/sql/columnar_insert.sql | 52 +++++++++++++- 7 files changed, 220 insertions(+), 5 deletions(-) diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 87b4131ce..61762160b 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -90,6 +90,7 @@ struct ColumnarReadState MemoryContext scanContext; Snapshot snapshot; + bool snapshotRegisteredByUs; }; /* static function declarations */ @@ -171,7 +172,8 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, - MemoryContext scanContext, Snapshot snapshot) + MemoryContext scanContext, Snapshot snapshot, + bool snapshotRegisteredByUs) { /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -194,6 +196,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, snapshot); readState->scanContext = scanContext; readState->snapshot = snapshot; + readState->snapshotRegisteredByUs = snapshotRegisteredByUs; return readState; } @@ -467,6 +470,15 @@ ColumnarRescan(ColumnarReadState *readState) void ColumnarEndRead(ColumnarReadState *readState) { + if (readState->snapshotRegisteredByUs) + { + /* + * init_columnar_read_state created a new snapshot and registered it, + * so now forget it. + */ + UnregisterSnapshot(readState->snapshot); + } + MemoryContextDelete(readState->stripeReadContext); if (readState->currentStripeMetadata) { diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 572c74e28..83e395fd5 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -258,9 +258,49 @@ init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_n Oid relfilenode = relation->rd_node.relNode; FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + bool snapshotRegisteredByUs = false; + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) + { + /* + * If we flushed any pending writes, then we should guarantee that + * those writes are visible to us too. For this reason, if given + * snapshot is an MVCC snapshot, then we set its curcid to current + * command id. + * + * For simplicity, we do that even if we didn't flush any writes + * since we don't see any problem with that. + * + * XXX: We should either not update cid if we are executing a FETCH + * (from cursor) command, or we should have a better way to deal with + * pending writes, see the discussion in + * https://github.com/citusdata/citus/issues/5231. + */ + PushCopiedSnapshot(snapshot); + + /* now our snapshot is the active one */ + UpdateActiveSnapshotCommandId(); + snapshot = GetActiveSnapshot(); + RegisterSnapshot(snapshot); + + /* + * To be able to use UpdateActiveSnapshotCommandId, we pushed the + * copied snapshot to the stack. However, we don't need to keep it + * there since we will anyway rely on ColumnarReadState->snapshot + * during read operation. + * + * Note that since we registered the snapshot already, we guarantee + * that PopActiveSnapshot won't free it. + */ + PopActiveSnapshot(); + + /* not forget to unregister it when finishing read operation */ + snapshotRegisteredByUs = true; + } + List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, - scanQual, scanContext, snapshot); + scanQual, scanContext, snapshot, + snapshotRegisteredByUs); MemoryContextSwitchTo(oldContext); diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 6ca7450ac..40cb81e70 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -214,7 +214,8 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation, List *projectedColumnList, List *qualConditions, MemoryContext scanContext, - Snapshot snaphot); + Snapshot snaphot, + bool snapshotRegisteredByUs); extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues, bool *columnNulls, uint64 *rowNumber); extern void ColumnarRescan(ColumnarReadState *readState); diff --git a/src/test/regress/expected/columnar_insert.out b/src/test/regress/expected/columnar_insert.out index 75dc04ad7..edbd21c53 100644 --- a/src/test/regress/expected/columnar_insert.out +++ b/src/test/regress/expected/columnar_insert.out @@ -1,6 +1,8 @@ -- -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail select count(*) from test_insert_command; @@ -228,4 +230,67 @@ ORDER BY 1,2,3,4; zero_col | 5 | 0 | 64 (5 rows) -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + alter_columnar_table_set +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; +SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +ERROR: duplicate key value violates unique constraint "selfconflict_pkey" +DETAIL: Key (f1)=(2) already exists. +COMMIT; +SELECT COUNT(*)=0 FROM selfconflict; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + CREATE INDEX ON flush_create_index(a); + SELECT a FROM flush_create_index WHERE a=5; + a +--------------------------------------------------------------------- + 5 +(1 row) + +ROLLBACK; +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; diff --git a/src/test/regress/expected/columnar_write_concurrency.out b/src/test/regress/expected/columnar_write_concurrency.out index 2d98674f7..88a4dc0e7 100644 --- a/src/test/regress/expected/columnar_write_concurrency.out +++ b/src/test/regress/expected/columnar_write_concurrency.out @@ -202,3 +202,43 @@ stripe_metadata_for_test_insert_concurrency_ok t (1 row) + +starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit +step s1-begin: + BEGIN; + +step s2-begin-repeatable: + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + +step s1-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s1-commit: + COMMIT; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/spec/columnar_write_concurrency.spec b/src/test/regress/spec/columnar_write_concurrency.spec index edfead34a..4b29c24a8 100644 --- a/src/test/regress/spec/columnar_write_concurrency.spec +++ b/src/test/regress/spec/columnar_write_concurrency.spec @@ -74,6 +74,11 @@ step "s2-begin" BEGIN; } +step "s2-begin-repeatable" +{ + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +} + step "s2-insert" { INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; @@ -103,3 +108,5 @@ permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" # Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has # the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times. permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata" + +permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit" diff --git a/src/test/regress/sql/columnar_insert.sql b/src/test/regress/sql/columnar_insert.sql index 1798f8571..96237ce02 100644 --- a/src/test/regress/sql/columnar_insert.sql +++ b/src/test/regress/sql/columnar_insert.sql @@ -2,6 +2,9 @@ -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; + CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail @@ -152,4 +155,51 @@ SELECT relname, stripe_num, chunk_group_num, row_count FROM columnar.chunk_group WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col' ORDER BY 1,2,3,4; -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; + +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; + +SELECT SUM(x)=1021110 FROM selfinsert; + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +COMMIT; + +SELECT COUNT(*)=0 FROM selfconflict; + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + + CREATE INDEX ON flush_create_index(a); + + SELECT a FROM flush_create_index WHERE a=5; +ROLLBACK; + +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; +