Update curcid of given snapshot if it is MVCC

Before starting to scan a columnar table, we always flush the pending
writes to disk.

However, we increment command counter after modifying metadata tables.

On the other hand, now that we _don't always use_ xact snapshot to scan
a columnar table, writes that we just flushed might not be visible to
the query that just flushed pending writes to disk since curcid of
provided snapshot would become smaller than the command id being used
when modifying metadata tables.

To give an example, before this change, below was a possible scenario
due to the changes that we made to use the correct snapshot.

```sql
CREATE TABLE t(a int, b int) USING columnar;
BEGIN;
  INSERT INTO t VALUES (5, 10);

  SELECT * FROM t;
  ┌───┬───┐
  │ a │ b │
  ├───┼───┤
  └───┴───┘
  (0 rows)

  SELECT * FROM t;
  ┌───┬────┐
  │ a │ b  │
  ├───┼────┤
  │ 5 │ 10 │
  └───┴────┘
  (1 row)
```
pull/5154/head
Onur Tirtir 2021-07-29 19:19:09 +03:00
parent 6c26c67ea0
commit bf4dfad6f7
7 changed files with 220 additions and 5 deletions

View File

@ -90,6 +90,7 @@ struct ColumnarReadState
MemoryContext scanContext;
Snapshot snapshot;
bool snapshotRegisteredByUs;
};
/* static function declarations */
@ -171,7 +172,8 @@ static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
ColumnarReadState *
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList,
MemoryContext scanContext, Snapshot snapshot)
MemoryContext scanContext, Snapshot snapshot,
bool snapshotRegisteredByUs)
{
/*
* We allocate all stripe specific data in the stripeReadContext, and reset
@ -194,6 +196,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
snapshot);
readState->scanContext = scanContext;
readState->snapshot = snapshot;
readState->snapshotRegisteredByUs = snapshotRegisteredByUs;
return readState;
}
@ -467,6 +470,15 @@ ColumnarRescan(ColumnarReadState *readState)
void
ColumnarEndRead(ColumnarReadState *readState)
{
if (readState->snapshotRegisteredByUs)
{
/*
* init_columnar_read_state created a new snapshot and registered it,
* so now forget it.
*/
UnregisterSnapshot(readState->snapshot);
}
MemoryContextDelete(readState->stripeReadContext);
if (readState->currentStripeMetadata)
{

View File

@ -258,9 +258,49 @@ init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_n
Oid relfilenode = relation->rd_node.relNode;
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
bool snapshotRegisteredByUs = false;
if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
{
/*
* If we flushed any pending writes, then we should guarantee that
* those writes are visible to us too. For this reason, if given
* snapshot is an MVCC snapshot, then we set its curcid to current
* command id.
*
* For simplicity, we do that even if we didn't flush any writes
* since we don't see any problem with that.
*
* XXX: We should either not update cid if we are executing a FETCH
* (from cursor) command, or we should have a better way to deal with
* pending writes, see the discussion in
* https://github.com/citusdata/citus/issues/5231.
*/
PushCopiedSnapshot(snapshot);
/* now our snapshot is the active one */
UpdateActiveSnapshotCommandId();
snapshot = GetActiveSnapshot();
RegisterSnapshot(snapshot);
/*
* To be able to use UpdateActiveSnapshotCommandId, we pushed the
* copied snapshot to the stack. However, we don't need to keep it
* there since we will anyway rely on ColumnarReadState->snapshot
* during read operation.
*
* Note that since we registered the snapshot already, we guarantee
* that PopActiveSnapshot won't free it.
*/
PopActiveSnapshot();
/* not forget to unregister it when finishing read operation */
snapshotRegisteredByUs = true;
}
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
scanQual, scanContext, snapshot);
scanQual, scanContext, snapshot,
snapshotRegisteredByUs);
MemoryContextSwitchTo(oldContext);

View File

@ -214,7 +214,8 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation,
List *projectedColumnList,
List *qualConditions,
MemoryContext scanContext,
Snapshot snaphot);
Snapshot snaphot,
bool snapshotRegisteredByUs);
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
bool *columnNulls, uint64 *rowNumber);
extern void ColumnarRescan(ColumnarReadState *readState);

View File

@ -1,6 +1,8 @@
--
-- Testing insert on columnar tables.
--
CREATE SCHEMA columnar_insert;
SET search_path TO columnar_insert;
CREATE TABLE test_insert_command (a int) USING columnar;
-- test single row inserts fail
select count(*) from test_insert_command;
@ -228,4 +230,67 @@ ORDER BY 1,2,3,4;
zero_col | 5 | 0 | 64
(5 rows)
DROP TABLE zero_col;
CREATE TABLE selfinsert(x int) USING columnar;
SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000);
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
BEGIN;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
INSERT INTO selfconflict VALUES (2,1), (2,2);
ERROR: duplicate key value violates unique constraint "selfconflict_pkey"
DETAIL: Key (f1)=(2) already exists.
COMMIT;
SELECT COUNT(*)=0 FROM selfconflict;
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE flush_create_index(a int, b int) USING columnar;
BEGIN;
INSERT INTO flush_create_index VALUES (5, 10);
SET enable_seqscan TO OFF;
SET columnar.enable_custom_scan TO OFF;
SET enable_indexscan TO ON;
CREATE INDEX ON flush_create_index(a);
SELECT a FROM flush_create_index WHERE a=5;
a
---------------------------------------------------------------------
5
(1 row)
ROLLBACK;
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_insert CASCADE;

View File

@ -202,3 +202,43 @@ stripe_metadata_for_test_insert_concurrency_ok
t
(1 row)
starting permutation: s1-begin s2-begin-repeatable s1-insert s2-insert s2-select s1-commit s2-select s2-commit
step s1-begin:
BEGIN;
step s2-begin-repeatable:
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
step s1-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
step s2-insert:
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
step s2-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a| b
---------------------------------------------------------------------
4| 8
5|10
6|12
(3 rows)
step s1-commit:
COMMIT;
step s2-select:
SELECT * FROM test_insert_concurrency ORDER BY a;
a| b
---------------------------------------------------------------------
4| 8
5|10
6|12
(3 rows)
step s2-commit:
COMMIT;

View File

@ -74,6 +74,11 @@ step "s2-begin"
BEGIN;
}
step "s2-begin-repeatable"
{
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
}
step "s2-insert"
{
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
@ -103,3 +108,5 @@ permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select"
# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has
# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times.
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata"
permutation "s1-begin" "s2-begin-repeatable" "s1-insert" "s2-insert" "s2-select" "s1-commit" "s2-select" "s2-commit"

View File

@ -2,6 +2,9 @@
-- Testing insert on columnar tables.
--
CREATE SCHEMA columnar_insert;
SET search_path TO columnar_insert;
CREATE TABLE test_insert_command (a int) USING columnar;
-- test single row inserts fail
@ -152,4 +155,51 @@ SELECT relname, stripe_num, chunk_group_num, row_count FROM columnar.chunk_group
WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col'
ORDER BY 1,2,3,4;
DROP TABLE zero_col;
CREATE TABLE selfinsert(x int) USING columnar;
SELECT alter_columnar_table_set('selfinsert', stripe_row_limit => 1000);
BEGIN;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
ROLLBACK;
INSERT INTO selfinsert SELECT generate_series(1,1010);
INSERT INTO selfinsert SELECT * FROM selfinsert;
SELECT SUM(x)=1021110 FROM selfinsert;
CREATE TABLE selfconflict (f1 int PRIMARY KEY, f2 int) USING columnar;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
INSERT INTO selfconflict VALUES (2,1), (2,2);
COMMIT;
SELECT COUNT(*)=0 FROM selfconflict;
CREATE TABLE flush_create_index(a int, b int) USING columnar;
BEGIN;
INSERT INTO flush_create_index VALUES (5, 10);
SET enable_seqscan TO OFF;
SET columnar.enable_custom_scan TO OFF;
SET enable_indexscan TO ON;
CREATE INDEX ON flush_create_index(a);
SELECT a FROM flush_create_index WHERE a=5;
ROLLBACK;
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_insert CASCADE;