Projection pushdown (#11)

DESCRIPTION: add pushdown support for projections and quals in table access method scan

This implementation uses custom scans to push projections into the scans on a columnar table. The custom scan replaces all access paths to a table to force the projection of the columns.
merge-cstore-pykello
Nils Dijk 2020-10-13 13:36:02 +02:00 committed by GitHub
parent 685d5c9d4c
commit 5fc7f61936
9 changed files with 579 additions and 6 deletions

View File

@ -49,12 +49,12 @@ ifeq ($(USE_FDW),yes)
fdw_copyto fdw_alter fdw_rollback fdw_truncate fdw_clean
endif
# disabled tests: am_block_filtering
ifeq ($(USE_TABLEAM),yes)
PG_CFLAGS += -DUSE_TABLEAM
OBJS += cstore_tableam.o
OBJS += cstore_tableam.o cstore_customscan.o
REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \
am_block_filtering am_join
ISOLATION += am_vacuum_vs_insert
endif

View File

@ -267,6 +267,7 @@ extern TableReadState * CStoreBeginRead(Relation relation,
extern bool CStoreReadFinished(TableReadState *state);
extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreRescan(TableReadState *readState);
extern void CStoreEndRead(TableReadState *state);
/* Function declarations for common functions */

426
cstore_customscan.c Normal file
View File

@ -0,0 +1,426 @@
/*-------------------------------------------------------------------------
*
* cstore_customscan.c
*
* This file contains the implementation of a postgres custom scan that
* we use to push down the projections into the table access methods.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/skey.h"
#include "nodes/extensible.h"
#include "nodes/pg_list.h"
#include "nodes/plannodes.h"
#include "optimizer/optimizer.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/restrictinfo.h"
#include "utils/relcache.h"
#include "cstore.h"
#include "cstore_customscan.h"
#include "cstore_tableam.h"
typedef struct CStoreScanPath
{
CustomPath custom_path;
/* place for local state during planning */
} CStoreScanPath;
typedef struct CStoreScanScan
{
CustomScan custom_scan;
/* place for local state during execution */
} CStoreScanScan;
typedef struct CStoreScanState
{
CustomScanState custom_scanstate;
List *qual;
} CStoreScanState;
static void CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte);
static Path * CreateCStoreScanPath(RelOptInfo *rel, RangeTblEntry *rte);
static Cost CStoreScanCost(RangeTblEntry *rte);
static Plan * CStoreScanPath_PlanCustomPath(PlannerInfo *root,
RelOptInfo *rel,
struct CustomPath *best_path,
List *tlist,
List *clauses,
List *custom_plans);
static Node * CStoreScan_CreateCustomScanState(CustomScan *cscan);
static void CStoreScan_BeginCustomScan(CustomScanState *node, EState *estate, int eflags);
static TupleTableSlot * CStoreScan_ExecCustomScan(CustomScanState *node);
static void CStoreScan_EndCustomScan(CustomScanState *node);
static void CStoreScan_ReScanCustomScan(CustomScanState *node);
/* saved hook value in case of unload */
static set_rel_pathlist_hook_type PreviousSetRelPathlistHook = NULL;
static bool EnableCStoreCustomScan = true;
const struct CustomPathMethods CStoreScanPathMethods = {
.CustomName = "CStoreScan",
.PlanCustomPath = CStoreScanPath_PlanCustomPath,
};
const struct CustomScanMethods CStoreScanScanMethods = {
.CustomName = "CStoreScan",
.CreateCustomScanState = CStoreScan_CreateCustomScanState,
};
const struct CustomExecMethods CStoreExecuteMethods = {
.CustomName = "CStoreScan",
.BeginCustomScan = CStoreScan_BeginCustomScan,
.ExecCustomScan = CStoreScan_ExecCustomScan,
.EndCustomScan = CStoreScan_EndCustomScan,
.ReScanCustomScan = CStoreScan_ReScanCustomScan,
.ExplainCustomScan = NULL,
};
/*
* cstore_customscan_init installs the hook required to intercept the postgres planner and
* provide extra paths for cstore tables
*/
void
cstore_customscan_init()
{
PreviousSetRelPathlistHook = set_rel_pathlist_hook;
set_rel_pathlist_hook = CStoreSetRelPathlistHook;
/* register customscan specific GUC's */
DefineCustomBoolVariable(
"cstore.enable_custom_scan",
gettext_noop("Enables the use of a custom scan to push projections and quals "
"into the storage layer"),
NULL,
&EnableCStoreCustomScan,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
}
static void
clear_paths(RelOptInfo *rel)
{
rel->pathlist = NULL;
rel->partial_pathlist = NULL;
rel->cheapest_startup_path = NULL;
rel->cheapest_total_path = NULL;
rel->cheapest_unique_path = NULL;
}
static void
CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte)
{
/* call into previous hook if assigned */
if (PreviousSetRelPathlistHook)
{
PreviousSetRelPathlistHook(root, rel, rti, rte);
}
if (!EnableCStoreCustomScan)
{
/* custon scans are disabled, use normal table access method api instead */
return;
}
if (!OidIsValid(rte->relid))
{
/* some calls to the pathlist hook don't have a valid relation set. Do nothing */
return;
}
/*
* Here we want to inspect if this relation pathlist hook is accessing a cstore table.
* If that is the case we want to insert an extra path that pushes down the projection
* into the scan of the table to minimize the data read.
*/
Relation relation = RelationIdGetRelation(rte->relid);
if (relation->rd_tableam == GetCstoreTableAmRoutine())
{
ereport(DEBUG1, (errmsg("pathlist hook for cstore table am")));
/* we propose a new path that will be the only path for scanning this relation */
Path *customPath = CreateCStoreScanPath(rel, rte);
clear_paths(rel);
add_path(rel, customPath);
}
RelationClose(relation);
}
static Path *
CreateCStoreScanPath(RelOptInfo *rel, RangeTblEntry *rte)
{
CStoreScanPath *cspath = (CStoreScanPath *) newNode(sizeof(CStoreScanPath),
T_CustomPath);
/*
* popuate custom path information
*/
CustomPath *cpath = &cspath->custom_path;
cpath->methods = &CStoreScanPathMethods;
/*
* populate generic path information
*/
Path *path = &cpath->path;
path->pathtype = T_CustomScan;
path->parent = rel;
path->pathtarget = rel->reltarget;
/*
* Add cost estimates for a cstore table scan, row count is the rows estimated by
* postgres' planner.
*/
path->rows = rel->rows;
path->startup_cost = 0;
path->total_cost = path->startup_cost + CStoreScanCost(rte);
return (Path *) cspath;
}
/*
* CStoreScanCost calculates the cost of scanning the cstore table. The cost is estimated
* by using all stripe metadata to estimate based on the columns to read how many pages
* need to be read.
*/
static Cost
CStoreScanCost(RangeTblEntry *rte)
{
Relation rel = RelationIdGetRelation(rte->relid);
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
RelationClose(rel);
rel = NULL;
uint32 maxColumnCount = 0;
uint64 totalStripeSize = 0;
ListCell *stripeMetadataCell = NULL;
foreach(stripeMetadataCell, metadata->stripeMetadataList)
{
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
totalStripeSize += stripeMetadata->dataLength;
maxColumnCount = Max(maxColumnCount, stripeMetadata->columnCount);
}
Bitmapset *attr_needed = rte->selectedCols;
double numberOfColumnsRead = bms_num_members(attr_needed);
double selectionRatio = numberOfColumnsRead / (double) maxColumnCount;
Cost scanCost = (double) totalStripeSize / BLCKSZ * selectionRatio;
return scanCost;
}
static Plan *
CStoreScanPath_PlanCustomPath(PlannerInfo *root,
RelOptInfo *rel,
struct CustomPath *best_path,
List *tlist,
List *clauses,
List *custom_plans)
{
CStoreScanScan *plan = (CStoreScanScan *) newNode(sizeof(CStoreScanScan),
T_CustomScan);
CustomScan *cscan = &plan->custom_scan;
cscan->methods = &CStoreScanScanMethods;
/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
clauses = extract_actual_clauses(clauses, false);
cscan->scan.plan.targetlist = list_copy(tlist);
cscan->scan.plan.qual = clauses;
cscan->scan.scanrelid = best_path->path.parent->relid;
return (Plan *) plan;
}
static Node *
CStoreScan_CreateCustomScanState(CustomScan *cscan)
{
CStoreScanState *cstorescanstate = (CStoreScanState *) newNode(
sizeof(CStoreScanState), T_CustomScanState);
CustomScanState *cscanstate = &cstorescanstate->custom_scanstate;
cscanstate->methods = &CStoreExecuteMethods;
cstorescanstate->qual = cscan->scan.plan.qual;
return (Node *) cscanstate;
}
static void
CStoreScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags)
{
/* scan slot is already initialized */
}
static Bitmapset *
CStoreAttrNeeded(ScanState *ss)
{
TupleTableSlot *slot = ss->ss_ScanTupleSlot;
int natts = slot->tts_tupleDescriptor->natts;
Bitmapset *attr_needed = NULL;
Plan *plan = ss->ps.plan;
int flags = PVC_RECURSE_AGGREGATES |
PVC_RECURSE_WINDOWFUNCS | PVC_RECURSE_PLACEHOLDERS;
List *vars = list_concat(pull_var_clause((Node *) plan->targetlist, flags),
pull_var_clause((Node *) plan->qual, flags));
ListCell *lc;
foreach(lc, vars)
{
Var *var = lfirst(lc);
if (var->varattno == 0)
{
elog(DEBUG1, "Need attribute: all");
/* all attributes are required, we don't need to add more so break*/
attr_needed = bms_add_range(attr_needed, 0, natts - 1);
break;
}
elog(DEBUG1, "Need attribute: %d", var->varattno);
attr_needed = bms_add_member(attr_needed, var->varattno - 1);
}
return attr_needed;
}
static TupleTableSlot *
CStoreScanNext(CStoreScanState *cstorescanstate)
{
CustomScanState *node = (CustomScanState *) cstorescanstate;
TableScanDesc scandesc;
EState *estate;
ScanDirection direction;
TupleTableSlot *slot;
/*
* get information from the estate and scan state
*/
scandesc = node->ss.ss_currentScanDesc;
estate = node->ss.ps.state;
direction = estate->es_direction;
slot = node->ss.ss_ScanTupleSlot;
if (scandesc == NULL)
{
/* the cstore access method does not use the flags, they are specific to heap */
uint32 flags = 0;
Bitmapset *attr_needed = CStoreAttrNeeded(&node->ss);
/*
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
scandesc = cstore_beginscan_extended(node->ss.ss_currentRelation,
estate->es_snapshot,
0, NULL, NULL, flags, attr_needed,
cstorescanstate->qual);
bms_free(attr_needed);
node->ss.ss_currentScanDesc = scandesc;
}
/*
* get the next tuple from the table
*/
if (table_scan_getnextslot(scandesc, direction, slot))
{
return slot;
}
return NULL;
}
/*
* SeqRecheck -- access method routine to recheck a tuple in EvalPlanQual
*/
static bool
CStoreScanRecheck(CStoreScanState *node, TupleTableSlot *slot)
{
return true;
}
static TupleTableSlot *
CStoreScan_ExecCustomScan(CustomScanState *node)
{
return ExecScan(&node->ss,
(ExecScanAccessMtd) CStoreScanNext,
(ExecScanRecheckMtd) CStoreScanRecheck);
}
static void
CStoreScan_EndCustomScan(CustomScanState *node)
{
TableScanDesc scanDesc;
/*
* get information from node
*/
scanDesc = node->ss.ss_currentScanDesc;
/*
* Free the exprcontext
*/
ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
if (node->ss.ps.ps_ResultTupleSlot)
{
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
}
ExecClearTuple(node->ss.ss_ScanTupleSlot);
/*
* close heap scan
*/
if (scanDesc != NULL)
{
table_endscan(scanDesc);
}
}
static void
CStoreScan_ReScanCustomScan(CustomScanState *node)
{
TableScanDesc scanDesc = node->ss.ss_currentScanDesc;
if (scanDesc != NULL)
{
table_rescan(node->ss.ss_currentScanDesc, NULL);
}
}

19
cstore_customscan.h Normal file
View File

@ -0,0 +1,19 @@
/*-------------------------------------------------------------------------
*
* cstore_customscan.h
*
* Forward declarations of functions to hookup the custom scan feature of
* cstore.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#ifndef CSTORE_FDW_CSTORE_CUSTOMSCAN_H
#define CSTORE_FDW_CSTORE_CUSTOMSCAN_H
void cstore_customscan_init(void);
#endif /*CSTORE_FDW_CSTORE_CUSTOMSCAN_H */

View File

@ -29,6 +29,7 @@
#endif
#include "optimizer/restrictinfo.h"
#include "storage/fd.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -222,6 +223,19 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
}
/*
* CStoreRescan clears the position where we were scanning so that the next read starts at
* the beginning again
*/
void
CStoreRescan(TableReadState *readState)
{
readState->stripeBuffers = NULL;
readState->readStripeCount = 0;
readState->stripeReadRowCount = 0;
}
/* Finishes a cstore read operation. */
void
CStoreEndRead(TableReadState *readState)

View File

@ -37,6 +37,7 @@
#include "utils/syscache.h"
#include "cstore.h"
#include "cstore_customscan.h"
#include "cstore_tableam.h"
#define CSTORE_TABLEAM_NAME "cstore_tableam"
@ -154,7 +155,7 @@ RelationColumnList(Relation rel)
for (int i = 0; i < tupdesc->natts; i++)
{
Index varno = 0;
Index varno = 1;
AttrNumber varattno = i + 1;
Oid vartype = tupdesc->attrs[i].atttypid;
int32 vartypmod = tupdesc->attrs[i].atttypmod;
@ -188,11 +189,36 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
ParallelTableScanDesc parallel_scan,
uint32 flags)
{
TableScanDesc scandesc;
int natts = relation->rd_att->natts;
Bitmapset *attr_needed = NULL;
attr_needed = bms_add_range(attr_needed, 0, natts - 1);
/* the cstore access method does not use the flags, they are specific to heap */
flags = 0;
scandesc = cstore_beginscan_extended(relation, snapshot, nkeys, key, parallel_scan,
flags, attr_needed, NULL);
pfree(attr_needed);
return scandesc;
}
TableScanDesc
cstore_beginscan_extended(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
ParallelTableScanDesc parallel_scan,
uint32 flags, Bitmapset *attr_needed, List *scanQual)
{
TupleDesc tupdesc = relation->rd_att;
TableReadState *readState = NULL;
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
List *columnList = NIL;
List *neededColumnList = NIL;
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
scan->cs_base.rs_rd = relation;
@ -204,7 +230,18 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
columnList = RelationColumnList(relation);
readState = CStoreBeginRead(relation, tupdesc, columnList, NULL);
/* only collect columns that we need for the scan */
ListCell *columnCell = NULL;
foreach(columnCell, columnList)
{
Var *var = castNode(Var, lfirst(columnCell));
if (bms_is_member(var->varattno - 1, attr_needed))
{
neededColumnList = lappend(neededColumnList, var);
}
}
readState = CStoreBeginRead(relation, tupdesc, neededColumnList, scanQual);
scan->cs_readState = readState;
@ -226,7 +263,8 @@ static void
cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
bool allow_strat, bool allow_sync, bool allow_pagemode)
{
elog(ERROR, "cstore_rescan not implemented");
CStoreScanDesc scan = (CStoreScanDesc) sscan;
CStoreRescan(scan->cs_readState);
}
@ -996,6 +1034,8 @@ cstore_tableam_init()
ExecutorEnd_hook = CStoreExecutorEnd;
prevObjectAccessHook = object_access_hook;
object_access_hook = CStoreTableAMObjectAccessHook;
cstore_customscan_init();
}

View File

@ -1,7 +1,15 @@
#include "postgres.h"
#include "fmgr.h"
#include "access/tableam.h"
#include "access/skey.h"
#include "nodes/bitmapset.h"
const TableAmRoutine * GetCstoreTableAmRoutine(void);
extern void cstore_tableam_init(void);
extern void cstore_tableam_finish(void);
extern TableScanDesc cstore_beginscan_extended(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
ParallelTableScanDesc parallel_scan,
uint32 flags, Bitmapset *attr_needed,
List *scanQual);

37
expected/am_join.out Normal file
View File

@ -0,0 +1,37 @@
CREATE SCHEMA am_cstore_join;
SET search_path TO am_cstore_join;
CREATE TABLE users (id int, name text) USING cstore_tableam;
INSERT INTO users SELECT a, 'name' || a FROM generate_series(0,30-1) AS a;
CREATE TABLE things (id int, user_id int, name text) USING cstore_tableam;
INSERT INTO things SELECT a, a % 30, 'thing' || a FROM generate_series(1,300) AS a;
-- force the nested loop to rescan the table
SET enable_material TO off;
SET enable_hashjoin TO off;
SET enable_mergejoin TO off;
SELECT count(*)
FROM users
JOIN things ON (users.id = things.user_id)
WHERE things.id > 290;
count
-------
10
(1 row)
-- verify the join uses a nested loop to trigger the rescan behaviour
EXPLAIN (COSTS OFF)
SELECT count(*)
FROM users
JOIN things ON (users.id = things.user_id)
WHERE things.id > 299990;
QUERY PLAN
--------------------------------------------------
Aggregate
-> Nested Loop
Join Filter: (users.id = things.user_id)
-> Custom Scan (CStoreScan) on things
Filter: (id > 299990)
-> Custom Scan (CStoreScan) on users
(6 rows)
SET client_min_messages TO warning;
DROP SCHEMA am_cstore_join CASCADE;

28
sql/am_join.sql Normal file
View File

@ -0,0 +1,28 @@
CREATE SCHEMA am_cstore_join;
SET search_path TO am_cstore_join;
CREATE TABLE users (id int, name text) USING cstore_tableam;
INSERT INTO users SELECT a, 'name' || a FROM generate_series(0,30-1) AS a;
CREATE TABLE things (id int, user_id int, name text) USING cstore_tableam;
INSERT INTO things SELECT a, a % 30, 'thing' || a FROM generate_series(1,300) AS a;
-- force the nested loop to rescan the table
SET enable_material TO off;
SET enable_hashjoin TO off;
SET enable_mergejoin TO off;
SELECT count(*)
FROM users
JOIN things ON (users.id = things.user_id)
WHERE things.id > 290;
-- verify the join uses a nested loop to trigger the rescan behaviour
EXPLAIN (COSTS OFF)
SELECT count(*)
FROM users
JOIN things ON (users.id = things.user_id)
WHERE things.id > 299990;
SET client_min_messages TO warning;
DROP SCHEMA am_cstore_join CASCADE;