From 5fc7f61936367dce31179b5b6dc2b83eb61e61ba Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 13 Oct 2020 13:36:02 +0200 Subject: [PATCH] Projection pushdown (#11) DESCRIPTION: add pushdown support for projections and quals in table access method scan This implementation uses custom scans to push projections into the scans on a columnar table. The custom scan replaces all access paths to a table to force the projection of the columns. --- Makefile | 6 +- cstore.h | 1 + cstore_customscan.c | 426 +++++++++++++++++++++++++++++++++++++++++++ cstore_customscan.h | 19 ++ cstore_reader.c | 14 ++ cstore_tableam.c | 46 ++++- cstore_tableam.h | 8 + expected/am_join.out | 37 ++++ sql/am_join.sql | 28 +++ 9 files changed, 579 insertions(+), 6 deletions(-) create mode 100644 cstore_customscan.c create mode 100644 cstore_customscan.h create mode 100644 expected/am_join.out create mode 100644 sql/am_join.sql diff --git a/Makefile b/Makefile index 58340450f..7e8bee13a 100644 --- a/Makefile +++ b/Makefile @@ -49,12 +49,12 @@ ifeq ($(USE_FDW),yes) fdw_copyto fdw_alter fdw_rollback fdw_truncate fdw_clean endif -# disabled tests: am_block_filtering ifeq ($(USE_TABLEAM),yes) PG_CFLAGS += -DUSE_TABLEAM - OBJS += cstore_tableam.o + OBJS += cstore_tableam.o cstore_customscan.o REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \ - am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean + am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean \ + am_block_filtering am_join ISOLATION += am_vacuum_vs_insert endif diff --git a/cstore.h b/cstore.h index 8a64730c8..f5e0590a8 100644 --- a/cstore.h +++ b/cstore.h @@ -267,6 +267,7 @@ extern TableReadState * CStoreBeginRead(Relation relation, extern bool CStoreReadFinished(TableReadState *state); extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues, bool *columnNulls); +extern void CStoreRescan(TableReadState *readState); extern void CStoreEndRead(TableReadState *state); /* Function declarations for common functions */ diff --git a/cstore_customscan.c b/cstore_customscan.c new file mode 100644 index 000000000..0dcdff111 --- /dev/null +++ b/cstore_customscan.c @@ -0,0 +1,426 @@ +/*------------------------------------------------------------------------- + * + * cstore_customscan.c + * + * This file contains the implementation of a postgres custom scan that + * we use to push down the projections into the table access methods. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/skey.h" +#include "nodes/extensible.h" +#include "nodes/pg_list.h" +#include "nodes/plannodes.h" +#include "optimizer/optimizer.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/restrictinfo.h" +#include "utils/relcache.h" + +#include "cstore.h" +#include "cstore_customscan.h" +#include "cstore_tableam.h" + +typedef struct CStoreScanPath +{ + CustomPath custom_path; + + /* place for local state during planning */ +} CStoreScanPath; + +typedef struct CStoreScanScan +{ + CustomScan custom_scan; + + /* place for local state during execution */ +} CStoreScanScan; + +typedef struct CStoreScanState +{ + CustomScanState custom_scanstate; + + List *qual; +} CStoreScanState; + + +static void CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti, + RangeTblEntry *rte); +static Path * CreateCStoreScanPath(RelOptInfo *rel, RangeTblEntry *rte); +static Cost CStoreScanCost(RangeTblEntry *rte); +static Plan * CStoreScanPath_PlanCustomPath(PlannerInfo *root, + RelOptInfo *rel, + struct CustomPath *best_path, + List *tlist, + List *clauses, + List *custom_plans); + +static Node * CStoreScan_CreateCustomScanState(CustomScan *cscan); + +static void CStoreScan_BeginCustomScan(CustomScanState *node, EState *estate, int eflags); +static TupleTableSlot * CStoreScan_ExecCustomScan(CustomScanState *node); +static void CStoreScan_EndCustomScan(CustomScanState *node); +static void CStoreScan_ReScanCustomScan(CustomScanState *node); + +/* saved hook value in case of unload */ +static set_rel_pathlist_hook_type PreviousSetRelPathlistHook = NULL; + +static bool EnableCStoreCustomScan = true; + + +const struct CustomPathMethods CStoreScanPathMethods = { + .CustomName = "CStoreScan", + .PlanCustomPath = CStoreScanPath_PlanCustomPath, +}; + +const struct CustomScanMethods CStoreScanScanMethods = { + .CustomName = "CStoreScan", + .CreateCustomScanState = CStoreScan_CreateCustomScanState, +}; + +const struct CustomExecMethods CStoreExecuteMethods = { + .CustomName = "CStoreScan", + + .BeginCustomScan = CStoreScan_BeginCustomScan, + .ExecCustomScan = CStoreScan_ExecCustomScan, + .EndCustomScan = CStoreScan_EndCustomScan, + .ReScanCustomScan = CStoreScan_ReScanCustomScan, + + .ExplainCustomScan = NULL, +}; + + +/* + * cstore_customscan_init installs the hook required to intercept the postgres planner and + * provide extra paths for cstore tables + */ +void +cstore_customscan_init() +{ + PreviousSetRelPathlistHook = set_rel_pathlist_hook; + set_rel_pathlist_hook = CStoreSetRelPathlistHook; + + /* register customscan specific GUC's */ + DefineCustomBoolVariable( + "cstore.enable_custom_scan", + gettext_noop("Enables the use of a custom scan to push projections and quals " + "into the storage layer"), + NULL, + &EnableCStoreCustomScan, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); +} + + +static void +clear_paths(RelOptInfo *rel) +{ + rel->pathlist = NULL; + rel->partial_pathlist = NULL; + rel->cheapest_startup_path = NULL; + rel->cheapest_total_path = NULL; + rel->cheapest_unique_path = NULL; +} + + +static void +CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti, + RangeTblEntry *rte) +{ + /* call into previous hook if assigned */ + if (PreviousSetRelPathlistHook) + { + PreviousSetRelPathlistHook(root, rel, rti, rte); + } + + if (!EnableCStoreCustomScan) + { + /* custon scans are disabled, use normal table access method api instead */ + return; + } + + if (!OidIsValid(rte->relid)) + { + /* some calls to the pathlist hook don't have a valid relation set. Do nothing */ + return; + } + + /* + * Here we want to inspect if this relation pathlist hook is accessing a cstore table. + * If that is the case we want to insert an extra path that pushes down the projection + * into the scan of the table to minimize the data read. + */ + Relation relation = RelationIdGetRelation(rte->relid); + if (relation->rd_tableam == GetCstoreTableAmRoutine()) + { + ereport(DEBUG1, (errmsg("pathlist hook for cstore table am"))); + + /* we propose a new path that will be the only path for scanning this relation */ + Path *customPath = CreateCStoreScanPath(rel, rte); + clear_paths(rel); + add_path(rel, customPath); + } + RelationClose(relation); +} + + +static Path * +CreateCStoreScanPath(RelOptInfo *rel, RangeTblEntry *rte) +{ + CStoreScanPath *cspath = (CStoreScanPath *) newNode(sizeof(CStoreScanPath), + T_CustomPath); + + /* + * popuate custom path information + */ + CustomPath *cpath = &cspath->custom_path; + cpath->methods = &CStoreScanPathMethods; + + /* + * populate generic path information + */ + Path *path = &cpath->path; + path->pathtype = T_CustomScan; + path->parent = rel; + path->pathtarget = rel->reltarget; + + /* + * Add cost estimates for a cstore table scan, row count is the rows estimated by + * postgres' planner. + */ + path->rows = rel->rows; + path->startup_cost = 0; + path->total_cost = path->startup_cost + CStoreScanCost(rte); + + return (Path *) cspath; +} + + +/* + * CStoreScanCost calculates the cost of scanning the cstore table. The cost is estimated + * by using all stripe metadata to estimate based on the columns to read how many pages + * need to be read. + */ +static Cost +CStoreScanCost(RangeTblEntry *rte) +{ + Relation rel = RelationIdGetRelation(rte->relid); + DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false); + RelationClose(rel); + rel = NULL; + + uint32 maxColumnCount = 0; + uint64 totalStripeSize = 0; + ListCell *stripeMetadataCell = NULL; + foreach(stripeMetadataCell, metadata->stripeMetadataList) + { + StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); + totalStripeSize += stripeMetadata->dataLength; + maxColumnCount = Max(maxColumnCount, stripeMetadata->columnCount); + } + + Bitmapset *attr_needed = rte->selectedCols; + double numberOfColumnsRead = bms_num_members(attr_needed); + double selectionRatio = numberOfColumnsRead / (double) maxColumnCount; + Cost scanCost = (double) totalStripeSize / BLCKSZ * selectionRatio; + + return scanCost; +} + + +static Plan * +CStoreScanPath_PlanCustomPath(PlannerInfo *root, + RelOptInfo *rel, + struct CustomPath *best_path, + List *tlist, + List *clauses, + List *custom_plans) +{ + CStoreScanScan *plan = (CStoreScanScan *) newNode(sizeof(CStoreScanScan), + T_CustomScan); + + CustomScan *cscan = &plan->custom_scan; + cscan->methods = &CStoreScanScanMethods; + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + clauses = extract_actual_clauses(clauses, false); + + cscan->scan.plan.targetlist = list_copy(tlist); + cscan->scan.plan.qual = clauses; + cscan->scan.scanrelid = best_path->path.parent->relid; + + return (Plan *) plan; +} + + +static Node * +CStoreScan_CreateCustomScanState(CustomScan *cscan) +{ + CStoreScanState *cstorescanstate = (CStoreScanState *) newNode( + sizeof(CStoreScanState), T_CustomScanState); + + CustomScanState *cscanstate = &cstorescanstate->custom_scanstate; + cscanstate->methods = &CStoreExecuteMethods; + + cstorescanstate->qual = cscan->scan.plan.qual; + + return (Node *) cscanstate; +} + + +static void +CStoreScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) +{ + /* scan slot is already initialized */ +} + + +static Bitmapset * +CStoreAttrNeeded(ScanState *ss) +{ + TupleTableSlot *slot = ss->ss_ScanTupleSlot; + int natts = slot->tts_tupleDescriptor->natts; + Bitmapset *attr_needed = NULL; + Plan *plan = ss->ps.plan; + int flags = PVC_RECURSE_AGGREGATES | + PVC_RECURSE_WINDOWFUNCS | PVC_RECURSE_PLACEHOLDERS; + List *vars = list_concat(pull_var_clause((Node *) plan->targetlist, flags), + pull_var_clause((Node *) plan->qual, flags)); + ListCell *lc; + + foreach(lc, vars) + { + Var *var = lfirst(lc); + + if (var->varattno == 0) + { + elog(DEBUG1, "Need attribute: all"); + + /* all attributes are required, we don't need to add more so break*/ + attr_needed = bms_add_range(attr_needed, 0, natts - 1); + break; + } + + elog(DEBUG1, "Need attribute: %d", var->varattno); + attr_needed = bms_add_member(attr_needed, var->varattno - 1); + } + + return attr_needed; +} + + +static TupleTableSlot * +CStoreScanNext(CStoreScanState *cstorescanstate) +{ + CustomScanState *node = (CustomScanState *) cstorescanstate; + TableScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + slot = node->ss.ss_ScanTupleSlot; + + if (scandesc == NULL) + { + /* the cstore access method does not use the flags, they are specific to heap */ + uint32 flags = 0; + Bitmapset *attr_needed = CStoreAttrNeeded(&node->ss); + + /* + * We reach here if the scan is not parallel, or if we're serially + * executing a scan that was planned to be parallel. + */ + scandesc = cstore_beginscan_extended(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL, NULL, flags, attr_needed, + cstorescanstate->qual); + bms_free(attr_needed); + + node->ss.ss_currentScanDesc = scandesc; + } + + /* + * get the next tuple from the table + */ + if (table_scan_getnextslot(scandesc, direction, slot)) + { + return slot; + } + return NULL; +} + + +/* + * SeqRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +CStoreScanRecheck(CStoreScanState *node, TupleTableSlot *slot) +{ + return true; +} + + +static TupleTableSlot * +CStoreScan_ExecCustomScan(CustomScanState *node) +{ + return ExecScan(&node->ss, + (ExecScanAccessMtd) CStoreScanNext, + (ExecScanRecheckMtd) CStoreScanRecheck); +} + + +static void +CStoreScan_EndCustomScan(CustomScanState *node) +{ + TableScanDesc scanDesc; + + /* + * get information from node + */ + scanDesc = node->ss.ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * clean out the tuple table + */ + if (node->ss.ps.ps_ResultTupleSlot) + { + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + } + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* + * close heap scan + */ + if (scanDesc != NULL) + { + table_endscan(scanDesc); + } +} + + +static void +CStoreScan_ReScanCustomScan(CustomScanState *node) +{ + TableScanDesc scanDesc = node->ss.ss_currentScanDesc; + if (scanDesc != NULL) + { + table_rescan(node->ss.ss_currentScanDesc, NULL); + } +} diff --git a/cstore_customscan.h b/cstore_customscan.h new file mode 100644 index 000000000..9e388e13f --- /dev/null +++ b/cstore_customscan.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * cstore_customscan.h + * + * Forward declarations of functions to hookup the custom scan feature of + * cstore. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#ifndef CSTORE_FDW_CSTORE_CUSTOMSCAN_H +#define CSTORE_FDW_CSTORE_CUSTOMSCAN_H + +void cstore_customscan_init(void); + + +#endif /*CSTORE_FDW_CSTORE_CUSTOMSCAN_H */ diff --git a/cstore_reader.c b/cstore_reader.c index cf2d0b171..c86021f7e 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -29,6 +29,7 @@ #endif #include "optimizer/restrictinfo.h" #include "storage/fd.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -222,6 +223,19 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu } +/* + * CStoreRescan clears the position where we were scanning so that the next read starts at + * the beginning again + */ +void +CStoreRescan(TableReadState *readState) +{ + readState->stripeBuffers = NULL; + readState->readStripeCount = 0; + readState->stripeReadRowCount = 0; +} + + /* Finishes a cstore read operation. */ void CStoreEndRead(TableReadState *readState) diff --git a/cstore_tableam.c b/cstore_tableam.c index 0840436ec..eae806e59 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -37,6 +37,7 @@ #include "utils/syscache.h" #include "cstore.h" +#include "cstore_customscan.h" #include "cstore_tableam.h" #define CSTORE_TABLEAM_NAME "cstore_tableam" @@ -154,7 +155,7 @@ RelationColumnList(Relation rel) for (int i = 0; i < tupdesc->natts; i++) { - Index varno = 0; + Index varno = 1; AttrNumber varattno = i + 1; Oid vartype = tupdesc->attrs[i].atttypid; int32 vartypmod = tupdesc->attrs[i].atttypmod; @@ -188,11 +189,36 @@ cstore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, ParallelTableScanDesc parallel_scan, uint32 flags) +{ + TableScanDesc scandesc; + int natts = relation->rd_att->natts; + Bitmapset *attr_needed = NULL; + + attr_needed = bms_add_range(attr_needed, 0, natts - 1); + + /* the cstore access method does not use the flags, they are specific to heap */ + flags = 0; + + scandesc = cstore_beginscan_extended(relation, snapshot, nkeys, key, parallel_scan, + flags, attr_needed, NULL); + + pfree(attr_needed); + + return scandesc; +} + + +TableScanDesc +cstore_beginscan_extended(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + uint32 flags, Bitmapset *attr_needed, List *scanQual) { TupleDesc tupdesc = relation->rd_att; TableReadState *readState = NULL; CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); List *columnList = NIL; + List *neededColumnList = NIL; MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); scan->cs_base.rs_rd = relation; @@ -204,7 +230,18 @@ cstore_beginscan(Relation relation, Snapshot snapshot, columnList = RelationColumnList(relation); - readState = CStoreBeginRead(relation, tupdesc, columnList, NULL); + /* only collect columns that we need for the scan */ + ListCell *columnCell = NULL; + foreach(columnCell, columnList) + { + Var *var = castNode(Var, lfirst(columnCell)); + if (bms_is_member(var->varattno - 1, attr_needed)) + { + neededColumnList = lappend(neededColumnList, var); + } + } + + readState = CStoreBeginRead(relation, tupdesc, neededColumnList, scanQual); scan->cs_readState = readState; @@ -226,7 +263,8 @@ static void cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_strat, bool allow_sync, bool allow_pagemode) { - elog(ERROR, "cstore_rescan not implemented"); + CStoreScanDesc scan = (CStoreScanDesc) sscan; + CStoreRescan(scan->cs_readState); } @@ -996,6 +1034,8 @@ cstore_tableam_init() ExecutorEnd_hook = CStoreExecutorEnd; prevObjectAccessHook = object_access_hook; object_access_hook = CStoreTableAMObjectAccessHook; + + cstore_customscan_init(); } diff --git a/cstore_tableam.h b/cstore_tableam.h index bdf7f96c0..557506b9f 100644 --- a/cstore_tableam.h +++ b/cstore_tableam.h @@ -1,7 +1,15 @@ #include "postgres.h" #include "fmgr.h" #include "access/tableam.h" +#include "access/skey.h" +#include "nodes/bitmapset.h" const TableAmRoutine * GetCstoreTableAmRoutine(void); extern void cstore_tableam_init(void); extern void cstore_tableam_finish(void); + +extern TableScanDesc cstore_beginscan_extended(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + uint32 flags, Bitmapset *attr_needed, + List *scanQual); diff --git a/expected/am_join.out b/expected/am_join.out new file mode 100644 index 000000000..fbb628187 --- /dev/null +++ b/expected/am_join.out @@ -0,0 +1,37 @@ +CREATE SCHEMA am_cstore_join; +SET search_path TO am_cstore_join; +CREATE TABLE users (id int, name text) USING cstore_tableam; +INSERT INTO users SELECT a, 'name' || a FROM generate_series(0,30-1) AS a; +CREATE TABLE things (id int, user_id int, name text) USING cstore_tableam; +INSERT INTO things SELECT a, a % 30, 'thing' || a FROM generate_series(1,300) AS a; +-- force the nested loop to rescan the table +SET enable_material TO off; +SET enable_hashjoin TO off; +SET enable_mergejoin TO off; +SELECT count(*) +FROM users +JOIN things ON (users.id = things.user_id) +WHERE things.id > 290; + count +------- + 10 +(1 row) + +-- verify the join uses a nested loop to trigger the rescan behaviour +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM users +JOIN things ON (users.id = things.user_id) +WHERE things.id > 299990; + QUERY PLAN +-------------------------------------------------- + Aggregate + -> Nested Loop + Join Filter: (users.id = things.user_id) + -> Custom Scan (CStoreScan) on things + Filter: (id > 299990) + -> Custom Scan (CStoreScan) on users +(6 rows) + +SET client_min_messages TO warning; +DROP SCHEMA am_cstore_join CASCADE; diff --git a/sql/am_join.sql b/sql/am_join.sql new file mode 100644 index 000000000..4d78dfe5b --- /dev/null +++ b/sql/am_join.sql @@ -0,0 +1,28 @@ +CREATE SCHEMA am_cstore_join; +SET search_path TO am_cstore_join; + +CREATE TABLE users (id int, name text) USING cstore_tableam; +INSERT INTO users SELECT a, 'name' || a FROM generate_series(0,30-1) AS a; + +CREATE TABLE things (id int, user_id int, name text) USING cstore_tableam; +INSERT INTO things SELECT a, a % 30, 'thing' || a FROM generate_series(1,300) AS a; + +-- force the nested loop to rescan the table +SET enable_material TO off; +SET enable_hashjoin TO off; +SET enable_mergejoin TO off; + +SELECT count(*) +FROM users +JOIN things ON (users.id = things.user_id) +WHERE things.id > 290; + +-- verify the join uses a nested loop to trigger the rescan behaviour +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM users +JOIN things ON (users.id = things.user_id) +WHERE things.id > 299990; + +SET client_min_messages TO warning; +DROP SCHEMA am_cstore_join CASCADE;