Not flush writes until need to read them when doing index-scan on columnar (#5247)

Not flush pending writes if given tid belongs to a "flushed" or
"aborted" stripe write, or to an "in-progress" stripe write of
another backend.

That way, we would reduce the cases where we flush single-tuple
stripes during index scan.

To do that, we follow below steps for index look-up's:

- Do not flush any pending writes and do stripe metadata look-up for
  given tid.
  If tuple with tid is found, then no need to do another look-up
  since we already found the tuple without needing to flush pending
  writes.

- If tuple is not found without flushing pending writes, then we have two
  scenarios:

  -  If given tid belongs to a pending write of my backend, then do stripe
     metadata look-up for given tid. But this time first **flush any pending
     writes**.
     
  -  Otherwise, just return false from `index_fetch_tuple` since flushing
      pending writes wouldn't help.
pull/5275/head
Onur Tirtir 2021-09-13 19:41:20 +03:00 committed by GitHub
parent 4ee0fb2758
commit ea61efb63a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 421 additions and 67 deletions

View File

@ -1261,8 +1261,17 @@ BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
stripeMetadata->firstRowNumber = DatumGetUInt64(
datumArray[Anum_columnar_stripe_first_row_number - 1]);
/*
* If there is unflushed data in a parent transaction, then we would
* have already thrown an error before starting to scan the table.. If
* the data is from an earlier subxact that committed, then it would
* have been flushed already. For this reason, we don't care about
* subtransaction id here.
*/
TransactionId entryXmin = HeapTupleHeaderGetXmin(heapTuple->t_data);
stripeMetadata->aborted = TransactionIdDidAbort(entryXmin);
stripeMetadata->insertedByCurrentXact =
TransactionIdIsCurrentTransactionId(entryXmin);
CheckStripeMetadataConsistency(stripeMetadata);

View File

@ -19,6 +19,7 @@
#include "safe_lib.h"
#include "access/nbtree.h"
#include "access/xact.h"
#include "catalog/pg_am.h"
#include "commands/defrem.h"
#include "distributed/listutils.h"
@ -178,7 +179,7 @@ ColumnarReadState *
ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList,
MemoryContext scanContext, Snapshot snapshot,
bool snapshotRegisteredByUs)
bool randomAccess)
{
/*
* We allocate all stripe specific data in the stripeReadContext, and reset
@ -197,16 +198,109 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
readState->stripeReadContext = stripeReadContext;
readState->stripeReadState = NULL;
readState->scanContext = scanContext;
readState->snapshot = snapshot;
readState->snapshotRegisteredByUs = snapshotRegisteredByUs;
/* set currentStripeMetadata for the first stripe to read */
AdvanceStripeRead(readState);
/*
* Note that ColumnarReadFlushPendingWrites might update those two by
* registering a new snapshot.
*/
readState->snapshot = snapshot;
readState->snapshotRegisteredByUs = false;
if (!randomAccess)
{
/*
* When doing random access (i.e.: index scan), we don't need to flush
* pending writes until we need to read them.
* columnar_index_fetch_tuple would do so when needed.
*/
ColumnarReadFlushPendingWrites(readState);
/*
* AdvanceStripeRead sets currentStripeMetadata for the first stripe
* to read if not doing random access. Otherwise, reader (i.e.:
* ColumnarReadRowByRowNumber) would already decide the stripe to read
* on-the-fly.
*
* Moreover, Since we don't flush pending writes for random access,
* AdvanceStripeRead might encounter with stripe metadata entries due
* to current transaction's pending writes even when using an MVCC
* snapshot, but AdvanceStripeRead would throw an error for that.
* Note that this is not the case with for plain table scan methods
* (i.e.: SeqScan and Columnar CustomScan).
*
* For those reasons, we don't call AdvanceStripeRead if we will do
* random access.
*/
AdvanceStripeRead(readState);
}
return readState;
}
/*
* ColumnarReadFlushPendingWrites flushes pending writes for read operation
* and sets a new (registered) snapshot if necessary.
*
* If it sets a new snapshot, then sets snapshotRegisteredByUs to true to
* indicate that caller should unregister the snapshot after finishing read
* operation.
*
* Note that this function assumes that readState's relation and snapshot
* fields are already set.
*/
void
ColumnarReadFlushPendingWrites(ColumnarReadState *readState)
{
Assert(!readState->snapshotRegisteredByUs);
Oid relfilenode = readState->relation->rd_node.relNode;
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
if (readState->snapshot == InvalidSnapshot || !IsMVCCSnapshot(readState->snapshot))
{
return;
}
/*
* If we flushed any pending writes, then we should guarantee that
* those writes are visible to us too. For this reason, if given
* snapshot is an MVCC snapshot, then we set its curcid to current
* command id.
*
* For simplicity, we do that even if we didn't flush any writes
* since we don't see any problem with that.
*
* XXX: We should either not update cid if we are executing a FETCH
* (from cursor) command, or we should have a better way to deal with
* pending writes, see the discussion in
* https://github.com/citusdata/citus/issues/5231.
*/
PushCopiedSnapshot(readState->snapshot);
/* now our snapshot is the active one */
UpdateActiveSnapshotCommandId();
Snapshot newSnapshot = GetActiveSnapshot();
RegisterSnapshot(newSnapshot);
/*
* To be able to use UpdateActiveSnapshotCommandId, we pushed the
* copied snapshot to the stack. However, we don't need to keep it
* there since we will anyway rely on ColumnarReadState->snapshot
* during read operation.
*
* Note that since we registered the snapshot already, we guarantee
* that PopActiveSnapshot won't free it.
*/
PopActiveSnapshot();
readState->snapshot = newSnapshot;
/* not forget to unregister it when finishing read operation */
readState->snapshotRegisteredByUs = true;
}
/*
* CreateStripeReadMemoryContext creates a memory context to be used when
* reading a stripe.
@ -266,6 +360,27 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
}
/*
* ColumnarReadRowByRowNumberOrError is a wrapper around
* ColumnarReadRowByRowNumber that throws an error if tuple
* with rowNumber does not exist.
*/
void
ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls)
{
if (!ColumnarReadRowByRowNumber(readState, rowNumber,
columnValues, columnNulls))
{
ereport(ERROR, (errmsg("cannot read from columnar table %s, tuple with "
"row number " UINT64_FORMAT " does not exist",
RelationGetRelationName(readState->relation),
rowNumber)));
}
}
/*
* ColumnarReadRowByRowNumber reads row with rowNumber from given relation
* into columnValues and columnNulls, and returns true. If no such row

View File

@ -254,56 +254,15 @@ CreateColumnarScanMemoryContext(void)
*/
static ColumnarReadState *
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
List *scanQual, MemoryContext scanContext, Snapshot snapshot)
List *scanQual, MemoryContext scanContext, Snapshot snapshot,
bool randomAccess)
{
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
Oid relfilenode = relation->rd_node.relNode;
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
bool snapshotRegisteredByUs = false;
if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
{
/*
* If we flushed any pending writes, then we should guarantee that
* those writes are visible to us too. For this reason, if given
* snapshot is an MVCC snapshot, then we set its curcid to current
* command id.
*
* For simplicity, we do that even if we didn't flush any writes
* since we don't see any problem with that.
*
* XXX: We should either not update cid if we are executing a FETCH
* (from cursor) command, or we should have a better way to deal with
* pending writes, see the discussion in
* https://github.com/citusdata/citus/issues/5231.
*/
PushCopiedSnapshot(snapshot);
/* now our snapshot is the active one */
UpdateActiveSnapshotCommandId();
snapshot = GetActiveSnapshot();
RegisterSnapshot(snapshot);
/*
* To be able to use UpdateActiveSnapshotCommandId, we pushed the
* copied snapshot to the stack. However, we don't need to keep it
* there since we will anyway rely on ColumnarReadState->snapshot
* during read operation.
*
* Note that since we registered the snapshot already, we guarantee
* that PopActiveSnapshot won't free it.
*/
PopActiveSnapshot();
/* not forget to unregister it when finishing read operation */
snapshotRegisteredByUs = true;
}
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
scanQual, scanContext, snapshot,
snapshotRegisteredByUs);
randomAccess);
MemoryContextSwitchTo(oldContext);
@ -354,10 +313,12 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
*/
if (scan->cs_readState == NULL)
{
bool randomAccess = false;
scan->cs_readState =
init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
scan->attr_needed, scan->scanQual,
scan->scanContext, scan->cs_base.rs_snapshot);
scan->scanContext, scan->cs_base.rs_snapshot,
randomAccess);
}
ExecClearTuple(slot);
@ -534,11 +495,12 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
/* no quals for index scan */
List *scanQual = NIL;
bool randomAccess = true;
scan->cs_readState = init_columnar_read_state(columnarRelation,
slot->tts_tupleDescriptor,
attr_needed, scanQual,
scan->scanContext,
snapshot);
snapshot, randomAccess);
}
uint64 rowNumber = tid_to_row_number(*tid);
@ -574,18 +536,61 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
}
else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS)
{
/* similar to aborted writes .. */
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
if (stripeMetadata->insertedByCurrentXact)
{
/*
* Stripe write is in progress and its entry is inserted by current
* transaction, so obviously it must be written by me. Since caller
* might want to use tupleslot datums for some reason, do another
* look-up, but this time by first flushing our writes.
*
* XXX: For index scan, this is the only case that we flush pending
* writes of the current backend. If we have taught reader how to
* read from WriteStateMap. then we could guarantee that
* index_fetch_tuple would never flush pending writes, but this seem
* to be too much work for now, but should be doable.
*/
ColumnarReadFlushPendingWrites(scan->cs_readState);
/*
* Stripe that "might" contain the tuple with rowNumber is not
* flushed yet. Here we set all attributes of given tupleslot to NULL
* before returning true and expect the indexAM callback that called
* us --possibly to check against constraint violation-- blocks until
* writer transaction commits or aborts, without requiring us to fill
* the tupleslot properly.
*/
memset(slot->tts_isnull, true, slot->tts_nvalid * sizeof(bool));
/*
* Fill the tupleslot and fall through to return true, it
* certainly exists.
*/
ColumnarReadRowByRowNumberOrError(scan->cs_readState, rowNumber,
slot->tts_values, slot->tts_isnull);
}
else
{
/* similar to aborted writes, it should be dirty snapshot */
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
/*
* Stripe that "might" contain the tuple with rowNumber is not
* flushed yet. Here we set all attributes of given tupleslot to NULL
* before returning true and expect the indexAM callback that called
* us --possibly to check against constraint violation-- blocks until
* writer transaction commits or aborts, without requiring us to fill
* the tupleslot properly.
*
* XXX: Note that the assumption we made above for the tupleslot
* holds for "unique" constraints defined on "btree" indexes.
*
* For the other constraints that we support, namely:
* * exclusion on btree,
* * exclusion on hash,
* * unique on btree;
* we still need to fill tts_values.
*
* However, for the same reason, we should have already flushed
* single tuple stripes when inserting into table for those three
* classes of constraints.
*
* This is annoying, but this also explains why this hack works for
* unique constraints on btree indexes, and also explains how we
* would never end up with "else" condition otherwise.
*/
memset(slot->tts_isnull, true, slot->tts_nvalid * sizeof(bool));
}
}
else
{
@ -902,9 +907,11 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
Snapshot snapshot = SnapshotAny;
MemoryContext scanContext = CreateColumnarScanMemoryContext();
bool randomAccess = false;
ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc,
attr_needed, scanQual,
scanContext, snapshot);
scanContext, snapshot,
randomAccess);
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));

View File

@ -230,22 +230,32 @@ extern bool ContainsPendingWrites(ColumnarWriteState *state);
extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state);
/* Function declarations for reading from columnar table */
/* functions applicable for both sequential and random access */
extern ColumnarReadState * ColumnarBeginRead(Relation relation,
TupleDesc tupleDescriptor,
List *projectedColumnList,
List *qualConditions,
MemoryContext scanContext,
Snapshot snaphot,
bool snapshotRegisteredByUs);
bool randomAccess);
extern void ColumnarReadFlushPendingWrites(ColumnarReadState *readState);
extern void ColumnarEndRead(ColumnarReadState *state);
extern void ColumnarResetRead(ColumnarReadState *readState);
/* functions only applicable for sequential access */
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
bool *columnNulls, uint64 *rowNumber);
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
extern void ColumnarRescan(ColumnarReadState *readState, List *scanQual);
/* functions only applicable for random access */
extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
extern void ColumnarEndRead(ColumnarReadState *state);
extern void ColumnarResetRead(ColumnarReadState *readState);
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
/* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,

View File

@ -29,6 +29,14 @@ typedef struct StripeMetadata
/* see StripeWriteState */
bool aborted;
/*
* If write operation is in-progress (i.e. StripeWriteState returned
* STRIPE_WRITE_IN_PROGRESS), then insertedByCurrentXact is used to
* distinguish whether it's being written by current transaction or
* not.
*/
bool insertedByCurrentXact;
} StripeMetadata;
/*

View File

@ -589,5 +589,120 @@ BEGIN;
SET LOCAL max_parallel_workers_per_gather = 4;
create index on events (event_id);
COMMIT;
CREATE TABLE pending_index_scan(i INT UNIQUE) USING columnar;
BEGIN;
INSERT INTO pending_index_scan SELECT generate_series(1,100);
-- test index scan when there are pending writes
SET LOCAL enable_seqscan TO OFF;
SET LOCAL columnar.enable_custom_scan TO OFF;
SELECT COUNT(*)=100 FROM pending_index_scan ;
?column?
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- show that we don't flush single-tuple stripes due to aborted writes ...
create table uniq(i int unique) using columnar;
-- a) when table has a unique:
begin;
insert into uniq select generate_series(1,100);
-- i) abort before flushing
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
-- ii) abort after flushing
SELECT count(*) FROM uniq;
count
---------------------------------------------------------------------
100
(1 row)
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
TRUNCATE uniq;
-- b) when table has a primary key:
begin;
insert into uniq select generate_series(1,100);
-- i) abort before flushing
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
-- ii) abort after flushing
SELECT count(*) FROM uniq;
count
---------------------------------------------------------------------
100
(1 row)
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
TRUNCATE uniq;
begin;
SAVEPOINT svpt;
insert into uniq select generate_series(1,100);
ROLLBACK TO SAVEPOINT svpt;
-- Since we rollbacked the writes in the upper transaction, we don't need
-- to flush pending writes for uniquenes check when inserting the same
-- values. So the following insert should just work.
insert into uniq select generate_series(1,100);
-- didn't flush anything yet, but should see the in progress stripe-write
SELECT stripe_num, first_row_number, row_count FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
stripe_num | first_row_number | row_count
---------------------------------------------------------------------
2 | 150001 | 0
(1 row)
commit;
-- should have completed the stripe reservation
SELECT stripe_num, first_row_number, row_count FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
stripe_num | first_row_number | row_count
---------------------------------------------------------------------
2 | 150001 | 100
(1 row)
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
SAVEPOINT svpt;
-- cannot verify unique constraint when there are pending writes in
-- the upper transaction
insert into uniq select generate_series(1,100);
ERROR: cannot read from index when there is unflushed data in upper transactions
rollback;
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_indexes CASCADE;

View File

@ -456,5 +456,95 @@ BEGIN;
create index on events (event_id);
COMMIT;
CREATE TABLE pending_index_scan(i INT UNIQUE) USING columnar;
BEGIN;
INSERT INTO pending_index_scan SELECT generate_series(1,100);
-- test index scan when there are pending writes
SET LOCAL enable_seqscan TO OFF;
SET LOCAL columnar.enable_custom_scan TO OFF;
SELECT COUNT(*)=100 FROM pending_index_scan ;
COMMIT;
-- show that we don't flush single-tuple stripes due to aborted writes ...
create table uniq(i int unique) using columnar;
-- a) when table has a unique:
begin;
insert into uniq select generate_series(1,100);
-- i) abort before flushing
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
-- ii) abort after flushing
SELECT count(*) FROM uniq;
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
TRUNCATE uniq;
-- b) when table has a primary key:
begin;
insert into uniq select generate_series(1,100);
-- i) abort before flushing
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
-- ii) abort after flushing
SELECT count(*) FROM uniq;
rollback;
insert into uniq select generate_series(1,100);
SELECT COUNT(*)=1 FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
TRUNCATE uniq;
begin;
SAVEPOINT svpt;
insert into uniq select generate_series(1,100);
ROLLBACK TO SAVEPOINT svpt;
-- Since we rollbacked the writes in the upper transaction, we don't need
-- to flush pending writes for uniquenes check when inserting the same
-- values. So the following insert should just work.
insert into uniq select generate_series(1,100);
-- didn't flush anything yet, but should see the in progress stripe-write
SELECT stripe_num, first_row_number, row_count FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
commit;
-- should have completed the stripe reservation
SELECT stripe_num, first_row_number, row_count FROM columnar.stripe cs
WHERE cs.storage_id = columnar_test_helpers.columnar_relation_storageid('columnar_indexes.uniq'::regclass);
TRUNCATE uniq;
begin;
insert into uniq select generate_series(1,100);
SAVEPOINT svpt;
-- cannot verify unique constraint when there are pending writes in
-- the upper transaction
insert into uniq select generate_series(1,100);
rollback;
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_indexes CASCADE;