mirror of https://github.com/citusdata/citus.git
Columnar: use clause Vars for chunk group filtering. (#4856)
* Columnar: use clause Vars for chunk group filtering. This solves #4780 and also provides a cleaner separation between chunk group filtering and projection pushdown. * Columnar: sort and deduplicate Vars pulled from clauses. * Columnar: cleanup variable names. * Columnar: remove alternate test output. * Columnar: do not recurse when looking for whereClauseVars. Co-authored-by: Jeff Davis <jefdavi@microsoft.com>pull/4823/head
parent
57c3e226cb
commit
063e673038
|
@ -77,6 +77,8 @@ struct ColumnarReadState
|
||||||
List *projectedColumnList;
|
List *projectedColumnList;
|
||||||
|
|
||||||
List *whereClauseList;
|
List *whereClauseList;
|
||||||
|
List *whereClauseVars;
|
||||||
|
|
||||||
MemoryContext stripeReadContext;
|
MemoryContext stripeReadContext;
|
||||||
int64 chunkGroupsFiltered;
|
int64 chunkGroupsFiltered;
|
||||||
};
|
};
|
||||||
|
@ -84,8 +86,8 @@ struct ColumnarReadState
|
||||||
/* static function declarations */
|
/* static function declarations */
|
||||||
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
|
||||||
TupleDesc tupleDesc, List *projectedColumnList,
|
TupleDesc tupleDesc, List *projectedColumnList,
|
||||||
List *whereClauseList, MemoryContext
|
List *whereClauseList, List *whereClauseVars,
|
||||||
stripeReadContext);
|
MemoryContext stripeReadContext);
|
||||||
static void EndStripeRead(StripeReadState *stripeReadState);
|
static void EndStripeRead(StripeReadState *stripeReadState);
|
||||||
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
|
@ -103,15 +105,17 @@ static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDesc tupleDescriptor,
|
||||||
List *projectedColumnList,
|
List *projectedColumnList,
|
||||||
List *whereClauseList,
|
List *whereClauseList,
|
||||||
|
List *whereClauseVars,
|
||||||
int64 *chunkGroupsFiltered);
|
int64 *chunkGroupsFiltered);
|
||||||
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
static ColumnBuffers * LoadColumnBuffers(Relation relation,
|
||||||
ColumnChunkSkipNode *chunkSkipNodeArray,
|
ColumnChunkSkipNode *chunkSkipNodeArray,
|
||||||
uint32 chunkCount, uint64 stripeOffset,
|
uint32 chunkCount, uint64 stripeOffset,
|
||||||
Form_pg_attribute attributeForm);
|
Form_pg_attribute attributeForm);
|
||||||
static bool * SelectedChunkMask(StripeSkipList *stripeSkipList,
|
static bool * SelectedChunkMask(StripeSkipList *stripeSkipList,
|
||||||
List *projectedColumnList, List *whereClauseList,
|
List *whereClauseList, List *whereClauseVars,
|
||||||
int64 *chunkGroupsFiltered);
|
int64 *chunkGroupsFiltered);
|
||||||
static Node * BuildBaseConstraint(Var *variable);
|
static Node * BuildBaseConstraint(Var *variable);
|
||||||
|
static List * GetClauseVars(List *clauses, int natts);
|
||||||
static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
||||||
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
|
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
|
||||||
static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
|
static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
|
||||||
|
@ -163,6 +167,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||||
readState->stripeList = stripeList;
|
readState->stripeList = stripeList;
|
||||||
readState->projectedColumnList = projectedColumnList;
|
readState->projectedColumnList = projectedColumnList;
|
||||||
readState->whereClauseList = whereClauseList;
|
readState->whereClauseList = whereClauseList;
|
||||||
|
readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
|
||||||
readState->chunkGroupsFiltered = 0;
|
readState->chunkGroupsFiltered = 0;
|
||||||
readState->tupleDescriptor = tupleDescriptor;
|
readState->tupleDescriptor = tupleDescriptor;
|
||||||
readState->stripeReadContext = stripeReadContext;
|
readState->stripeReadContext = stripeReadContext;
|
||||||
|
@ -199,6 +204,7 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
readState->tupleDescriptor,
|
readState->tupleDescriptor,
|
||||||
readState->projectedColumnList,
|
readState->projectedColumnList,
|
||||||
readState->whereClauseList,
|
readState->whereClauseList,
|
||||||
|
readState->whereClauseVars,
|
||||||
readState->stripeReadContext);
|
readState->stripeReadContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,8 +257,8 @@ ColumnarEndRead(ColumnarReadState *readState)
|
||||||
*/
|
*/
|
||||||
static StripeReadState *
|
static StripeReadState *
|
||||||
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
|
||||||
List *projectedColumnList, List *whereClauseList, MemoryContext
|
List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
|
||||||
stripeReadContext)
|
MemoryContext stripeReadContext)
|
||||||
{
|
{
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
|
||||||
|
|
||||||
|
@ -270,6 +276,7 @@ BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDes
|
||||||
tupleDesc,
|
tupleDesc,
|
||||||
projectedColumnList,
|
projectedColumnList,
|
||||||
whereClauseList,
|
whereClauseList,
|
||||||
|
whereClauseVars,
|
||||||
&stripeReadState->
|
&stripeReadState->
|
||||||
chunkGroupsFiltered);
|
chunkGroupsFiltered);
|
||||||
|
|
||||||
|
@ -533,7 +540,8 @@ ColumnarTableRowCount(Relation relation)
|
||||||
static StripeBuffers *
|
static StripeBuffers *
|
||||||
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
TupleDesc tupleDescriptor, List *projectedColumnList,
|
TupleDesc tupleDescriptor, List *projectedColumnList,
|
||||||
List *whereClauseList, int64 *chunkGroupsFiltered)
|
List *whereClauseList, List *whereClauseVars,
|
||||||
|
int64 *chunkGroupsFiltered)
|
||||||
{
|
{
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
uint32 columnCount = tupleDescriptor->natts;
|
uint32 columnCount = tupleDescriptor->natts;
|
||||||
|
@ -545,8 +553,8 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
stripeMetadata->chunkCount);
|
stripeMetadata->chunkCount);
|
||||||
|
|
||||||
bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, projectedColumnList,
|
bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
|
||||||
whereClauseList, chunkGroupsFiltered);
|
whereClauseVars, chunkGroupsFiltered);
|
||||||
|
|
||||||
StripeSkipList *selectedChunkSkipList =
|
StripeSkipList *selectedChunkSkipList =
|
||||||
SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
|
SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
|
||||||
|
@ -646,8 +654,8 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
|
||||||
* the chunk can be refuted by the given qualifier conditions.
|
* the chunk can be refuted by the given qualifier conditions.
|
||||||
*/
|
*/
|
||||||
static bool *
|
static bool *
|
||||||
SelectedChunkMask(StripeSkipList *stripeSkipList, List *projectedColumnList,
|
SelectedChunkMask(StripeSkipList *stripeSkipList, List *whereClauseList,
|
||||||
List *whereClauseList, int64 *chunkGroupsFiltered)
|
List *whereClauseVars, int64 *chunkGroupsFiltered)
|
||||||
{
|
{
|
||||||
ListCell *columnCell = NULL;
|
ListCell *columnCell = NULL;
|
||||||
uint32 chunkIndex = 0;
|
uint32 chunkIndex = 0;
|
||||||
|
@ -655,7 +663,7 @@ SelectedChunkMask(StripeSkipList *stripeSkipList, List *projectedColumnList,
|
||||||
bool *selectedChunkMask = palloc0(stripeSkipList->chunkCount * sizeof(bool));
|
bool *selectedChunkMask = palloc0(stripeSkipList->chunkCount * sizeof(bool));
|
||||||
memset(selectedChunkMask, true, stripeSkipList->chunkCount * sizeof(bool));
|
memset(selectedChunkMask, true, stripeSkipList->chunkCount * sizeof(bool));
|
||||||
|
|
||||||
foreach(columnCell, projectedColumnList)
|
foreach(columnCell, whereClauseVars)
|
||||||
{
|
{
|
||||||
Var *column = lfirst(columnCell);
|
Var *column = lfirst(columnCell);
|
||||||
uint32 columnIndex = column->varattno - 1;
|
uint32 columnIndex = column->varattno - 1;
|
||||||
|
@ -759,6 +767,58 @@ BuildBaseConstraint(Var *variable)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetClauseVars extracts the Vars from the given clauses for the purpose of
|
||||||
|
* building constraints that can be refuted by predicate_refuted_by(). It also
|
||||||
|
* deduplicates and sorts them.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
GetClauseVars(List *whereClauseList, int natts)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We don't recurse into or include aggregates, window functions, or
|
||||||
|
* PHVs. We don't expect any PHVs during execution; and Vars found inside
|
||||||
|
* an aggregate or window function aren't going to be useful in forming
|
||||||
|
* constraints that can be refuted.
|
||||||
|
*/
|
||||||
|
int flags = 0;
|
||||||
|
List *vars = pull_var_clause((Node *) whereClauseList, flags);
|
||||||
|
Var **deduplicate = palloc0(sizeof(Var *) * natts);
|
||||||
|
|
||||||
|
ListCell *lc;
|
||||||
|
foreach(lc, vars)
|
||||||
|
{
|
||||||
|
Node *node = lfirst(lc);
|
||||||
|
Assert(IsA(node, Var));
|
||||||
|
|
||||||
|
Var *var = (Var *) node;
|
||||||
|
int idx = var->varattno - 1;
|
||||||
|
|
||||||
|
if (deduplicate[idx] != NULL)
|
||||||
|
{
|
||||||
|
/* if they have the same varattno, the rest should be identical */
|
||||||
|
Assert(equal(var, deduplicate[idx]));
|
||||||
|
}
|
||||||
|
|
||||||
|
deduplicate[idx] = var;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *whereClauseVars = NIL;
|
||||||
|
for (int i = 0; i < natts; i++)
|
||||||
|
{
|
||||||
|
Var *var = deduplicate[i];
|
||||||
|
if (var != NULL)
|
||||||
|
{
|
||||||
|
whereClauseVars = lappend(whereClauseVars, var);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(deduplicate);
|
||||||
|
|
||||||
|
return whereClauseVars;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MakeOpExpression builds an operator expression node. This operator expression
|
* MakeOpExpression builds an operator expression node. This operator expression
|
||||||
* implements the operator clause as defined by the variable and the strategy
|
* implements the operator clause as defined by the variable and the strategy
|
||||||
|
|
|
@ -101,3 +101,13 @@ EXPLAIN (analyze on, costs off, timing off, summary off)
|
||||||
SELECT count(*) FROM multi_column_chunk_filtering WHERE a > 50000 AND b > 50000;
|
SELECT count(*) FROM multi_column_chunk_filtering WHERE a > 50000 AND b > 50000;
|
||||||
|
|
||||||
DROP TABLE multi_column_chunk_filtering;
|
DROP TABLE multi_column_chunk_filtering;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- https://github.com/citusdata/citus/issues/4780
|
||||||
|
--
|
||||||
|
create table part_table (id int) partition by range (id);
|
||||||
|
create table part_1_row partition of part_table for values from (150000) to (160000);
|
||||||
|
create table part_2_columnar partition of part_table for values from (0) to (150000) using columnar;
|
||||||
|
insert into part_table select generate_series(1,159999);
|
||||||
|
select filtered_row_count('select count(*) from part_table where id > 75000');
|
||||||
|
drop table part_table;
|
||||||
|
|
|
@ -182,3 +182,17 @@ EXPLAIN (analyze on, costs off, timing off, summary off)
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
DROP TABLE multi_column_chunk_filtering;
|
DROP TABLE multi_column_chunk_filtering;
|
||||||
|
--
|
||||||
|
-- https://github.com/citusdata/citus/issues/4780
|
||||||
|
--
|
||||||
|
create table part_table (id int) partition by range (id);
|
||||||
|
create table part_1_row partition of part_table for values from (150000) to (160000);
|
||||||
|
create table part_2_columnar partition of part_table for values from (0) to (150000) using columnar;
|
||||||
|
insert into part_table select generate_series(1,159999);
|
||||||
|
select filtered_row_count('select count(*) from part_table where id > 75000');
|
||||||
|
filtered_row_count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5000
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
drop table part_table;
|
||||||
|
|
Loading…
Reference in New Issue