Merge pull request #5058 from citusdata/col/optimize-index-read

Use long-lasting mem cxt during columnar index scan & optimize correlated ones
pull/5143/head
Onur Tirtir 2021-08-02 11:06:57 +03:00 committed by GitHub
commit 38940ed2a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 385 additions and 80 deletions

View File

@ -85,10 +85,14 @@ struct ColumnarReadState
/* static function declarations */
static MemoryContext CreateStripeReadMemoryContext(void);
static void ReadStripeRowByRowNumber(StripeReadState *stripeReadState,
StripeMetadata *stripeMetadata,
static bool ColumnarReadIsCurrentStripe(ColumnarReadState *readState,
uint64 rowNumber);
static StripeMetadata * ColumnarReadGetCurrentStripe(ColumnarReadState *readState);
static void ReadStripeRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
static bool StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState,
int chunkGroupIndex);
static void ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
StripeMetadata *stripeMetadata,
uint64 stripeRowOffset, Datum *columnValues,
@ -99,7 +103,6 @@ static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relatio
TupleDesc tupleDesc, List *projectedColumnList,
List *whereClauseList, List *whereClauseVars,
MemoryContext stripeReadContext);
static void EndStripeRead(StripeReadState *stripeReadState);
static void AdvanceStripeRead(ColumnarReadState *readState);
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
bool *columnNulls);
@ -246,50 +249,92 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
* exists, then returns false.
*/
bool
ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber,
List *neededColumnList, Datum *columnValues,
ColumnarReadRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls, Snapshot snapshot)
{
StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation, rowNumber, snapshot);
if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
{
Relation columnarRelation = readState->relation;
StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
rowNumber, snapshot);
if (stripeMetadata == NULL)
{
/* no such row exists */
return false;
}
TupleDesc relationTupleDesc = RelationGetDescr(relation);
/* do the cleanup before reading a new stripe */
ColumnarResetRead(readState);
TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
List *whereClauseList = NIL;
List *whereClauseVars = NIL;
MemoryContext stripeReadContext = CreateStripeReadMemoryContext();
StripeReadState *stripeReadState = BeginStripeRead(stripeMetadata,
relation,
MemoryContext stripeReadContext = readState->stripeReadContext;
readState->stripeReadState = BeginStripeRead(stripeMetadata,
columnarRelation,
relationTupleDesc,
neededColumnList,
readState->projectedColumnList,
whereClauseList,
whereClauseVars,
stripeReadContext);
ReadStripeRowByRowNumber(stripeReadState, stripeMetadata, rowNumber,
columnValues, columnNulls);
readState->currentStripeMetadata = stripeMetadata;
}
EndStripeRead(stripeReadState);
MemoryContextReset(stripeReadContext);
ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
return true;
}
/*
* ColumnarReadIsCurrentStripe returns true if stripe being read contains
* row with given rowNumber.
*/
static bool
ColumnarReadIsCurrentStripe(ColumnarReadState *readState, uint64 rowNumber)
{
if (!StripeReadInProgress(readState))
{
return false;
}
StripeMetadata *currentStripeMetadata = readState->currentStripeMetadata;
if (rowNumber >= currentStripeMetadata->firstRowNumber &&
rowNumber <= StripeGetHighestRowNumber(currentStripeMetadata))
{
return true;
}
return false;
}
/*
* ColumnarReadGetCurrentStripe returns StripeMetadata for the stripe that is
* being read.
*/
static StripeMetadata *
ColumnarReadGetCurrentStripe(ColumnarReadState *readState)
{
return readState->currentStripeMetadata;
}
/*
* ReadStripeRowByRowNumber reads row with rowNumber from given
* stripeReadState into columnValues and columnNulls.
* Errors out if no such row exists in the stripe being read.
*/
static void
ReadStripeRowByRowNumber(StripeReadState *stripeReadState,
StripeMetadata *stripeMetadata,
ReadStripeRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls)
{
StripeMetadata *stripeMetadata = ColumnarReadGetCurrentStripe(readState);
StripeReadState *stripeReadState = readState->stripeReadState;
if (rowNumber < stripeMetadata->firstRowNumber)
{
/* not expected but be on the safe side */
@ -298,21 +343,42 @@ ReadStripeRowByRowNumber(StripeReadState *stripeReadState,
/* find the exact chunk group to be read */
uint64 stripeRowOffset = rowNumber - stripeMetadata->firstRowNumber;
stripeReadState->chunkGroupIndex = stripeRowOffset /
stripeMetadata->chunkGroupRowCount;
int chunkGroupIndex = stripeRowOffset / stripeMetadata->chunkGroupRowCount;
if (!StripeReadIsCurrentChunkGroup(stripeReadState, chunkGroupIndex))
{
if (stripeReadState->chunkGroupReadState)
{
EndChunkGroupRead(stripeReadState->chunkGroupReadState);
}
stripeReadState->chunkGroupIndex = chunkGroupIndex;
stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
stripeReadState->stripeBuffers,
stripeReadState->chunkGroupIndex,
stripeReadState->tupleDescriptor,
stripeReadState->projectedColumnList,
stripeReadState->stripeReadContext);
}
ReadChunkGroupRowByRowOffset(stripeReadState->chunkGroupReadState,
stripeMetadata, stripeRowOffset,
columnValues, columnNulls);
}
EndChunkGroupRead(stripeReadState->chunkGroupReadState);
stripeReadState->chunkGroupReadState = NULL;
/*
* StripeReadIsCurrentChunkGroup returns true if chunk group being read is
* the has given chunkGroupIndex in its stripe.
*/
static bool
StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState, int chunkGroupIndex)
{
if (!stripeReadState->chunkGroupReadState)
{
return false;
}
return (stripeReadState->chunkGroupIndex == chunkGroupIndex);
}
@ -390,6 +456,24 @@ ColumnarEndRead(ColumnarReadState *readState)
}
/*
* ColumnarResetRead resets the stripe and the chunk group that is
* being read currently (if any).
*/
void
ColumnarResetRead(ColumnarReadState *readState)
{
if (StripeReadInProgress(readState))
{
pfree(readState->currentStripeMetadata);
readState->currentStripeMetadata = NULL;
readState->stripeReadState = NULL;
MemoryContextReset(readState->stripeReadContext);
}
}
/*
* BeginStripeRead allocates state for reading a stripe.
*/
@ -427,16 +511,6 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
}
/*
* EndStripeRead finishes a stripe read.
*/
static void
EndStripeRead(StripeReadState *stripeReadState)
{
pfree(stripeReadState);
}
/*
* AdvanceStripeRead updates chunkGroupsFiltered and sets
* currentStripeMetadata for next stripe read.
@ -449,9 +523,6 @@ AdvanceStripeRead(ColumnarReadState *readState)
uint64 lastReadRowNumber =
StripeGetHighestRowNumber(readState->currentStripeMetadata);
EndStripeRead(readState->stripeReadState);
readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
lastReadRowNumber,
GetTransactionSnapshot());
@ -1307,13 +1378,6 @@ DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
chunkBuffers->valueCompressionType,
chunkBuffers->decompressedValueSize);
if (chunkBuffers->valueCompressionType != COMPRESSION_NONE)
{
/* compressed data is not needed anymore */
pfree(chunkBuffers->valueBuffer->data);
pfree(chunkBuffers->valueBuffer);
}
DeserializeBoolArray(chunkBuffers->existsBuffer,
chunkData->existsArray[columnIndex],
rowCount);

View File

@ -87,10 +87,30 @@ typedef struct ColumnarScanDescData
typedef struct ColumnarScanDescData *ColumnarScanDesc;
/*
* IndexFetchColumnarData is the scan state passed between index_fetch_begin,
* index_fetch_reset, index_fetch_end, index_fetch_tuple calls.
*/
typedef struct IndexFetchColumnarData
{
IndexFetchTableData cs_base;
ColumnarReadState *cs_readState;
/*
* We initialize cs_readState lazily in the first columnar_index_fetch_tuple
* call. However, we want to do memory allocations in a sub MemoryContext of
* columnar_index_fetch_begin. For this reason, we store scanContext in
* columnar_index_fetch_begin.
*/
MemoryContext scanContext;
} IndexFetchColumnarData;
static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
/* forward declaration for static functions */
static MemoryContext CreateColumnarScanMemoryContext(void);
static void ColumnarTableDropHook(Oid tgid);
static void ColumnarTriggerCreateHook(Oid tgid);
static void ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId,
@ -178,12 +198,7 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
* initialized read state. We assume that beginscan is called in a
* context that will last until end of scan.
*/
MemoryContext scanContext =
AllocSetContextCreate(
CurrentMemoryContext,
"Column Store Scan Context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext scanContext = CreateColumnarScanMemoryContext();
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
ColumnarScanDesc scan = palloc0(sizeof(ColumnarScanDescData));
@ -220,6 +235,18 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
}
/*
* CreateColumnarScanMemoryContext creates a memory context to store
* ColumnarReadStare in it.
*/
static MemoryContext
CreateColumnarScanMemoryContext(void)
{
return AllocSetContextCreate(CurrentMemoryContext, "Columnar Scan Context",
ALLOCSET_DEFAULT_SIZES);
}
/*
* init_columnar_read_state initializes a column store table read and returns the
* state.
@ -401,29 +428,43 @@ columnar_index_fetch_begin(Relation rel)
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
IndexFetchTableData *scan = palloc0(sizeof(IndexFetchTableData));
scan->rel = rel;
return scan;
MemoryContext scanContext = CreateColumnarScanMemoryContext();
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
IndexFetchColumnarData *scan = palloc0(sizeof(IndexFetchColumnarData));
scan->cs_base.rel = rel;
scan->cs_readState = NULL;
scan->scanContext = scanContext;
MemoryContextSwitchTo(oldContext);
return &scan->cs_base;
}
static void
columnar_index_fetch_reset(IndexFetchTableData *scan)
columnar_index_fetch_reset(IndexFetchTableData *sscan)
{
/* no-op */
}
static void
columnar_index_fetch_end(IndexFetchTableData *scan)
columnar_index_fetch_end(IndexFetchTableData *sscan)
{
columnar_index_fetch_reset(scan);
pfree(scan);
columnar_index_fetch_reset(sscan);
IndexFetchColumnarData *scan = (IndexFetchColumnarData *) sscan;
if (scan->cs_readState)
{
ColumnarEndRead(scan->cs_readState);
scan->cs_readState = NULL;
}
}
static bool
columnar_index_fetch_tuple(struct IndexFetchTableData *scan,
columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot,
@ -443,19 +484,35 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *scan,
ExecClearTuple(slot);
IndexFetchColumnarData *scan = (IndexFetchColumnarData *) sscan;
Relation columnarRelation = scan->cs_base.rel;
/* initialize read state for the first row */
if (scan->cs_readState == NULL)
{
MemoryContext oldContext = MemoryContextSwitchTo(scan->scanContext);
/* we need all columns */
int natts = scan->rel->rd_att->natts;
int natts = columnarRelation->rd_att->natts;
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);
TupleDesc relationTupleDesc = RelationGetDescr(scan->rel);
List *relationColumnList = NeededColumnsList(relationTupleDesc, attr_needed);
/* no quals for index scan */
List *scanQual = NIL;
scan->cs_readState = init_columnar_read_state(columnarRelation,
slot->tts_tupleDescriptor,
attr_needed, scanQual);
MemoryContextSwitchTo(oldContext);
}
uint64 rowNumber = tid_to_row_number(*tid);
if (!ColumnarReadRowByRowNumber(scan->rel, rowNumber, relationColumnList,
slot->tts_values, slot->tts_isnull, snapshot))
if (!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber, slot->tts_values,
slot->tts_isnull, snapshot))
{
return false;
}
slot->tts_tableOid = RelationGetRelid(scan->rel);
slot->tts_tableOid = RelationGetRelid(columnarRelation);
slot->tts_tid = *tid;
ExecStoreVirtualTuple(slot);

View File

@ -216,10 +216,11 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation,
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
bool *columnNulls, uint64 *rowNumber);
extern void ColumnarRescan(ColumnarReadState *readState);
extern bool ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber,
List *neededColumnList, Datum *columnValues,
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls, Snapshot snapshot);
extern void ColumnarEndRead(ColumnarReadState *state);
extern void ColumnarResetRead(ColumnarReadState *readState);
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
/* Function declarations for common functions */

View File

@ -52,6 +52,7 @@ SELECT * FROM t;
-- make sure that we test index scan
set columnar.enable_custom_scan to 'off';
set enable_seqscan to off;
set seq_page_cost TO 10000000;
CREATE table columnar_table (a INT, b int) USING columnar;
INSERT INTO columnar_table (a) VALUES (1), (1);
CREATE UNIQUE INDEX CONCURRENTLY ON columnar_table (a);
@ -234,6 +235,28 @@ SELECT (SELECT b FROM columnar_table WHERE a = 150000)=300000;
t
(1 row)
-- Since our index is highly correlated with the relation itself, we should
-- de-serialize each chunk group only once. For this reason, if this test
-- file hangs on below queries, then you should think that we are not properly
-- caching the last-read chunk group during index reads.
SELECT SUM(a)=312487500 FROM columnar_table WHERE a < 25000;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT SUM(a)=167000 FROM columnar_table WHERE a = 16000 OR a = 151000;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT SUM(a)=48000 FROM columnar_table WHERE a = 16000 OR a = 32000;
?column?
---------------------------------------------------------------------
t
(1 row)
TRUNCATE columnar_table;
ALTER TABLE columnar_table DROP CONSTRAINT columnar_table_pkey;
-- hash --
@ -435,5 +458,89 @@ NOTICE: falling back to serial index build since parallel scan on columnar tabl
REINDEX TABLE CONCURRENTLY parallel_scan_test;
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
-- test with different data types & indexAM's --
CREATE TABLE hash_text(a INT, b TEXT) USING columnar;
INSERT INTO hash_text SELECT i, (i*2)::TEXT FROM generate_series(1, 10) i;
CREATE INDEX ON hash_text USING hash (b);
SELECT b FROM hash_text WHERE b='10';
b
---------------------------------------------------------------------
10
(1 row)
CREATE TABLE hash_int(a INT, b TEXT) USING columnar;
INSERT INTO hash_int SELECT i, (i*3)::TEXT FROM generate_series(1, 10) i;
CREATE INDEX ON hash_int USING hash (a);
SELECT b='15' FROM hash_int WHERE a=5;
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE mixed_data_types (
timestamp_col timestamp,
box_col box,
circle_col circle,
float_col float,
uuid_col uuid,
text_col text,
numeric_col numeric,
PRIMARY KEY(timestamp_col, text_col)
) USING columnar;
INSERT INTO mixed_data_types
SELECT
to_timestamp(i+36000),
box(point(i, i+90)),
circle(point(i*2, i*3), i*100),
(i*1.2)::float,
uuid_in(md5((i*10)::text || (i*15)::text)::cstring),
(i*8)::text,
(i*42)::numeric
FROM generate_series(1, 10) i;
SELECT text_col='64'
FROM mixed_data_types WHERE timestamp_col='1970-01-01 02:00:08';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT uuid_col='298923c8-1900-45e9-1288-b430794814c4'
FROM mixed_data_types WHERE timestamp_col='1970-01-01 02:00:01';
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE INDEX hash_uuid ON mixed_data_types USING hash(uuid_col);
SELECT box_col=box(point(1, 91)) AND timestamp_col='1970-01-01 02:00:01'
FROM mixed_data_types WHERE uuid_col='298923c8-1900-45e9-1288-b430794814c4';
?column?
---------------------------------------------------------------------
t
(1 row)
DROP INDEX hash_uuid;
CREATE INDEX btree_multi_numeric_text_timestamp
ON mixed_data_types (numeric_col, text_col, timestamp_col);
SELECT uuid_col='ab2481c9-f93d-0ed3-033a-3281d865ccb2'
FROM mixed_data_types
WHERE
numeric_col >= 120 AND numeric_col <= 220 AND
circle_col >= circle(point(7, 7), 350) AND
float_col <= 5.0;
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE revisit_same_cgroup(a INT, b TEXT) USING columnar;
CREATE INDEX ON revisit_same_cgroup USING HASH (b);
INSERT INTO revisit_same_cgroup SELECT random()*500, (random()*500)::INT::TEXT FROM generate_series(1, 100000) i;
SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1';
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_indexes CASCADE;

View File

@ -25,6 +25,7 @@ SELECT * FROM t;
-- make sure that we test index scan
set columnar.enable_custom_scan to 'off';
set enable_seqscan to off;
set seq_page_cost TO 10000000;
CREATE table columnar_table (a INT, b int) USING columnar;
@ -158,6 +159,14 @@ TRUNCATE columnar_table;
INSERT INTO columnar_table (a, b) SELECT i,i*2 FROM generate_series(1, 160000) i;
SELECT (SELECT b FROM columnar_table WHERE a = 150000)=300000;
-- Since our index is highly correlated with the relation itself, we should
-- de-serialize each chunk group only once. For this reason, if this test
-- file hangs on below queries, then you should think that we are not properly
-- caching the last-read chunk group during index reads.
SELECT SUM(a)=312487500 FROM columnar_table WHERE a < 25000;
SELECT SUM(a)=167000 FROM columnar_table WHERE a = 16000 OR a = 151000;
SELECT SUM(a)=48000 FROM columnar_table WHERE a = 16000 OR a = 32000;
TRUNCATE columnar_table;
ALTER TABLE columnar_table DROP CONSTRAINT columnar_table_pkey;
@ -321,5 +330,72 @@ REINDEX TABLE parallel_scan_test;
CREATE INDEX CONCURRENTLY ON parallel_scan_test (a);
REINDEX TABLE CONCURRENTLY parallel_scan_test;
-- test with different data types & indexAM's --
CREATE TABLE hash_text(a INT, b TEXT) USING columnar;
INSERT INTO hash_text SELECT i, (i*2)::TEXT FROM generate_series(1, 10) i;
CREATE INDEX ON hash_text USING hash (b);
SELECT b FROM hash_text WHERE b='10';
CREATE TABLE hash_int(a INT, b TEXT) USING columnar;
INSERT INTO hash_int SELECT i, (i*3)::TEXT FROM generate_series(1, 10) i;
CREATE INDEX ON hash_int USING hash (a);
SELECT b='15' FROM hash_int WHERE a=5;
CREATE TABLE mixed_data_types (
timestamp_col timestamp,
box_col box,
circle_col circle,
float_col float,
uuid_col uuid,
text_col text,
numeric_col numeric,
PRIMARY KEY(timestamp_col, text_col)
) USING columnar;
INSERT INTO mixed_data_types
SELECT
to_timestamp(i+36000),
box(point(i, i+90)),
circle(point(i*2, i*3), i*100),
(i*1.2)::float,
uuid_in(md5((i*10)::text || (i*15)::text)::cstring),
(i*8)::text,
(i*42)::numeric
FROM generate_series(1, 10) i;
SELECT text_col='64'
FROM mixed_data_types WHERE timestamp_col='1970-01-01 02:00:08';
SELECT uuid_col='298923c8-1900-45e9-1288-b430794814c4'
FROM mixed_data_types WHERE timestamp_col='1970-01-01 02:00:01';
CREATE INDEX hash_uuid ON mixed_data_types USING hash(uuid_col);
SELECT box_col=box(point(1, 91)) AND timestamp_col='1970-01-01 02:00:01'
FROM mixed_data_types WHERE uuid_col='298923c8-1900-45e9-1288-b430794814c4';
DROP INDEX hash_uuid;
CREATE INDEX btree_multi_numeric_text_timestamp
ON mixed_data_types (numeric_col, text_col, timestamp_col);
SELECT uuid_col='ab2481c9-f93d-0ed3-033a-3281d865ccb2'
FROM mixed_data_types
WHERE
numeric_col >= 120 AND numeric_col <= 220 AND
circle_col >= circle(point(7, 7), 350) AND
float_col <= 5.0;
CREATE TABLE revisit_same_cgroup(a INT, b TEXT) USING columnar;
CREATE INDEX ON revisit_same_cgroup USING HASH (b);
INSERT INTO revisit_same_cgroup SELECT random()*500, (random()*500)::INT::TEXT FROM generate_series(1, 100000) i;
SELECT sum(a)>-1 FROM revisit_same_cgroup WHERE b = '1';
SET client_min_messages TO WARNING;
DROP SCHEMA columnar_indexes CASCADE;