From bf4dfad6f76a591403551de8596fa4392adeca67 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 29 Jul 2021 19:19:09 +0300 Subject: [PATCH] Update curcid of given snapshot if it is MVCC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before starting to scan a columnar table, we always flush the pending writes to disk. However, we increment command counter after modifying metadata tables. On the other hand, now that we _don't always use_ xact snapshot to scan a columnar table, writes that we just flushed might not be visible to the query that just flushed pending writes to disk since curcid of provided snapshot would become smaller than the command id being used when modifying metadata tables. To give an example, before this change, below was a possible scenario due to the changes that we made to use the correct snapshot. ```sql CREATE TABLE t(a int, b int) USING columnar; BEGIN; INSERT INTO t VALUES (5, 10); SELECT * FROM t; ┌───┬───┐ │ a │ b │ ├───┼───┤ └───┴───┘ (0 rows) SELECT * FROM t; ┌───┬────┐ │ a │ b │ ├───┼────┤ │ 5 │ 10 │ └───┴────┘ (1 row) ``` --- src/backend/columnar/columnar_reader.c | 14 +++- src/backend/columnar/columnar_tableam.c | 42 +++++++++++- src/include/columnar/columnar.h | 3 +- src/test/regress/expected/columnar_insert.out | 67 ++++++++++++++++++- .../expected/columnar_write_concurrency.out | 40 +++++++++++ .../spec/columnar_write_concurrency.spec | 7 ++ src/test/regress/sql/columnar_insert.sql | 52 +++++++++++++- 7 files changed, 220 insertions(+), 5 deletions(-) diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 87b4131ce..61762160b 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -90,6 +90,7 @@ struct ColumnarReadState MemoryContext scanContext; Snapshot snapshot; + bool snapshotRegisteredByUs; }; /* static function declarations */ @@ -171,7 +172,8 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, - MemoryContext scanContext, Snapshot snapshot) + MemoryContext scanContext, Snapshot snapshot, + bool snapshotRegisteredByUs) { /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -194,6 +196,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, snapshot); readState->scanContext = scanContext; readState->snapshot = snapshot; + readState->snapshotRegisteredByUs = snapshotRegisteredByUs; return readState; } @@ -467,6 +470,15 @@ ColumnarRescan(ColumnarReadState *readState) void ColumnarEndRead(ColumnarReadState *readState) { + if (readState->snapshotRegisteredByUs) + { + /* + * init_columnar_read_state created a new snapshot and registered it, + * so now forget it. + */ + UnregisterSnapshot(readState->snapshot); + } + MemoryContextDelete(readState->stripeReadContext); if (readState->currentStripeMetadata) { diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 572c74e28..83e395fd5 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -258,9 +258,49 @@ init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_n Oid relfilenode = relation->rd_node.relNode; FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + bool snapshotRegisteredByUs = false; + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) + { + /* + * If we flushed any pending writes, then we should guarantee that + * those writes are visible to us too. For this reason, if given + * snapshot is an MVCC snapshot, then we set its curcid to current + * command id. + * + * For simplicity, we do that even if we didn't flush any writes + * since we don't see any problem with that. + * + * XXX: We should either not update cid if we are executing a FETCH + * (from cursor) command, or we should have a better way to deal with + * pending writes, see the discussion in + * https://github.com/citusdata/citus/issues/5231. + */ + PushCopiedSnapshot(snapshot); + + /* now our snapshot is the active one */ + UpdateActiveSnapshotCommandId(); + snapshot = GetActiveSnapshot(); + RegisterSnapshot(snapshot); + + /* + * To be able to use UpdateActiveSnapshotCommandId, we pushed the + * copied snapshot to the stack. However, we don't need to keep it + * there since we will anyway rely on ColumnarReadState->snapshot + * during read operation. + * + * Note that since we registered the snapshot already, we guarantee + * that PopActiveSnapshot won't free it. + */ + PopActiveSnapshot(); + + /* not forget to unregister it when finishing read operation */ + snapshotRegisteredByUs = true; + } + List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, - scanQual, scanContext, snapshot); + scanQual, scanContext, snapshot, + snapshotRegisteredByUs); MemoryContextSwitchTo(oldContext); diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 6ca7450ac..40cb81e70 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -214,7 +214,8 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation, List *projectedColumnList, List *qualConditions, MemoryContext scanContext, - Snapshot snaphot); + Snapshot snaphot, + bool snapshotRegisteredByUs); extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues, bool *columnNulls, uint64 *rowNumber); extern void ColumnarRescan(ColumnarReadState *readState); diff --git a/src/test/regress/expected/columnar_insert.out b/src/test/regress/expected/columnar_insert.out index 75dc04ad7..edbd21c53 100644 --- a/src/test/regress/expected/columnar_insert.out +++ b/src/test/regress/expected/columnar_insert.out @@ -1,6 +1,8 @@ -- -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail select count(*) from test_insert_command; @@ -228,4 +230,67 @@ ORDER BY 1,2,3,4; zero_col | 5 | 0 | 64 (5 rows) -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + alter_columnar_table_set +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; +SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +ERROR: duplicate key value violates unique constraint "selfconflict_pkey" +DETAIL: Key (f1)=(2) already exists. +COMMIT; +SELECT COUNT(*)=0 FROM selfconflict; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + CREATE INDEX ON flush_create_index(a); + SELECT a FROM flush_create_index WHERE a=5; + a +--------------------------------------------------------------------- + 5 +(1 row) + +ROLLBACK; +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; diff --git a/src/test/regress/expected/columnar_write_concurrency.out b/src/test/regress/expected/columnar_write_concurrency.out index 2d98674f7..88a4dc0e7 100644 --- a/src/test/regress/expected/columnar_write_concurrency.out +++ b/src/test/regress/expected/columnar_write_concurrency.out @@ -202,3 +202,43 @@ stripe_metadata_for_test_insert_concurrency_ok t (1 row) + +starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit +step s1-begin: + BEGIN; + +step s2-begin-repeatable: + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + +step s1-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s1-commit: + COMMIT; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/spec/columnar_write_concurrency.spec b/src/test/regress/spec/columnar_write_concurrency.spec index edfead34a..4b29c24a8 100644 --- a/src/test/regress/spec/columnar_write_concurrency.spec +++ b/src/test/regress/spec/columnar_write_concurrency.spec @@ -74,6 +74,11 @@ step "s2-begin" BEGIN; } +step "s2-begin-repeatable" +{ + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +} + step "s2-insert" { INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; @@ -103,3 +108,5 @@ permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" # Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has # the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times. permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata" + +permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit" diff --git a/src/test/regress/sql/columnar_insert.sql b/src/test/regress/sql/columnar_insert.sql index 1798f8571..96237ce02 100644 --- a/src/test/regress/sql/columnar_insert.sql +++ b/src/test/regress/sql/columnar_insert.sql @@ -2,6 +2,9 @@ -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; + CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail @@ -152,4 +155,51 @@ SELECT relname, stripe_num, chunk_group_num, row_count FROM columnar.chunk_group WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col' ORDER BY 1,2,3,4; -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; + +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; + +SELECT SUM(x)=1021110 FROM selfinsert; + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +COMMIT; + +SELECT COUNT(*)=0 FROM selfconflict; + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + + CREATE INDEX ON flush_create_index(a); + + SELECT a FROM flush_create_index WHERE a=5; +ROLLBACK; + +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; +