diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 87b4131ce..61762160b 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -90,6 +90,7 @@ struct ColumnarReadState MemoryContext scanContext; Snapshot snapshot; + bool snapshotRegisteredByUs; }; /* static function declarations */ @@ -171,7 +172,8 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, ColumnarReadState * ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, List *projectedColumnList, List *whereClauseList, - MemoryContext scanContext, Snapshot snapshot) + MemoryContext scanContext, Snapshot snapshot, + bool snapshotRegisteredByUs) { /* * We allocate all stripe specific data in the stripeReadContext, and reset @@ -194,6 +196,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, snapshot); readState->scanContext = scanContext; readState->snapshot = snapshot; + readState->snapshotRegisteredByUs = snapshotRegisteredByUs; return readState; } @@ -467,6 +470,15 @@ ColumnarRescan(ColumnarReadState *readState) void ColumnarEndRead(ColumnarReadState *readState) { + if (readState->snapshotRegisteredByUs) + { + /* + * init_columnar_read_state created a new snapshot and registered it, + * so now forget it. + */ + UnregisterSnapshot(readState->snapshot); + } + MemoryContextDelete(readState->stripeReadContext); if (readState->currentStripeMetadata) { diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 572c74e28..83e395fd5 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -258,9 +258,49 @@ init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_n Oid relfilenode = relation->rd_node.relNode; FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + bool snapshotRegisteredByUs = false; + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) + { + /* + * If we flushed any pending writes, then we should guarantee that + * those writes are visible to us too. For this reason, if given + * snapshot is an MVCC snapshot, then we set its curcid to current + * command id. + * + * For simplicity, we do that even if we didn't flush any writes + * since we don't see any problem with that. + * + * XXX: We should either not update cid if we are executing a FETCH + * (from cursor) command, or we should have a better way to deal with + * pending writes, see the discussion in + * https://github.com/citusdata/citus/issues/5231. + */ + PushCopiedSnapshot(snapshot); + + /* now our snapshot is the active one */ + UpdateActiveSnapshotCommandId(); + snapshot = GetActiveSnapshot(); + RegisterSnapshot(snapshot); + + /* + * To be able to use UpdateActiveSnapshotCommandId, we pushed the + * copied snapshot to the stack. However, we don't need to keep it + * there since we will anyway rely on ColumnarReadState->snapshot + * during read operation. + * + * Note that since we registered the snapshot already, we guarantee + * that PopActiveSnapshot won't free it. + */ + PopActiveSnapshot(); + + /* not forget to unregister it when finishing read operation */ + snapshotRegisteredByUs = true; + } + List *neededColumnList = NeededColumnsList(tupdesc, attr_needed); ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList, - scanQual, scanContext, snapshot); + scanQual, scanContext, snapshot, + snapshotRegisteredByUs); MemoryContextSwitchTo(oldContext); diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index 6ca7450ac..40cb81e70 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -214,7 +214,8 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation, List *projectedColumnList, List *qualConditions, MemoryContext scanContext, - Snapshot snaphot); + Snapshot snaphot, + bool snapshotRegisteredByUs); extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues, bool *columnNulls, uint64 *rowNumber); extern void ColumnarRescan(ColumnarReadState *readState); diff --git a/src/test/regress/expected/columnar_insert.out b/src/test/regress/expected/columnar_insert.out index 75dc04ad7..edbd21c53 100644 --- a/src/test/regress/expected/columnar_insert.out +++ b/src/test/regress/expected/columnar_insert.out @@ -1,6 +1,8 @@ -- -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail select count(*) from test_insert_command; @@ -228,4 +230,67 @@ ORDER BY 1,2,3,4; zero_col | 5 | 0 | 64 (5 rows) -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + alter_columnar_table_set +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; +SELECT SUM(x)=1021110 FROM selfinsert; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +ERROR: duplicate key value violates unique constraint "selfconflict_pkey" +DETAIL: Key (f1)=(2) already exists. +COMMIT; +SELECT COUNT(*)=0 FROM selfconflict; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + CREATE INDEX ON flush_create_index(a); + SELECT a FROM flush_create_index WHERE a=5; + a +--------------------------------------------------------------------- + 5 +(1 row) + +ROLLBACK; +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; diff --git a/src/test/regress/expected/columnar_write_concurrency.out b/src/test/regress/expected/columnar_write_concurrency.out index 2d98674f7..88a4dc0e7 100644 --- a/src/test/regress/expected/columnar_write_concurrency.out +++ b/src/test/regress/expected/columnar_write_concurrency.out @@ -202,3 +202,43 @@ stripe_metadata_for_test_insert_concurrency_ok t (1 row) + +starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit +step s1-begin: + BEGIN; + +step s2-begin-repeatable: + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + +step s1-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i; + +step s2-insert: + INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s1-commit: + COMMIT; + +step s2-select: + SELECT * FROM test_insert_concurrency ORDER BY a; + +a| b +--------------------------------------------------------------------- +4| 8 +5|10 +6|12 +(3 rows) + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/spec/columnar_write_concurrency.spec b/src/test/regress/spec/columnar_write_concurrency.spec index edfead34a..4b29c24a8 100644 --- a/src/test/regress/spec/columnar_write_concurrency.spec +++ b/src/test/regress/spec/columnar_write_concurrency.spec @@ -74,6 +74,11 @@ step "s2-begin" BEGIN; } +step "s2-begin-repeatable" +{ + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +} + step "s2-insert" { INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i; @@ -103,3 +108,5 @@ permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" # Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has # the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times. permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata" + +permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit" diff --git a/src/test/regress/sql/columnar_insert.sql b/src/test/regress/sql/columnar_insert.sql index 1798f8571..96237ce02 100644 --- a/src/test/regress/sql/columnar_insert.sql +++ b/src/test/regress/sql/columnar_insert.sql @@ -2,6 +2,9 @@ -- Testing insert on columnar tables. -- +CREATE SCHEMA columnar_insert; +SET search_path TO columnar_insert; + CREATE TABLE test_insert_command (a int) USING columnar; -- test single row inserts fail @@ -152,4 +155,51 @@ SELECT relname, stripe_num, chunk_group_num, row_count FROM columnar.chunk_group WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col' ORDER BY 1,2,3,4; -DROP TABLE zero_col; +CREATE TABLE selfinsert(x int) USING columnar; + +SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000); + +BEGIN; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfinsert SELECT generate_series(1,1010); + INSERT INTO selfinsert SELECT * FROM selfinsert; + + SELECT SUM(x)=1021110 FROM selfinsert; +ROLLBACK; + +INSERT INTO selfinsert SELECT generate_series(1,1010); +INSERT INTO selfinsert SELECT * FROM selfinsert; + +SELECT SUM(x)=1021110 FROM selfinsert; + +CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar; + +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + INSERT INTO selfconflict VALUES (2,1), (2,2); +COMMIT; + +SELECT COUNT(*)=0 FROM selfconflict; + +CREATE TABLE flush_create_index(a int, b int) USING columnar; +BEGIN; + INSERT INTO flush_create_index VALUES (5, 10); + + SET enable_seqscan TO OFF; + SET columnar.enable_custom_scan TO OFF; + SET enable_indexscan TO ON; + + CREATE INDEX ON flush_create_index(a); + + SELECT a FROM flush_create_index WHERE a=5; +ROLLBACK; + +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_insert CASCADE; +