From b06f48a2a79c38b18132374f8f797743e5a15c45 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Fri, 11 Sep 2020 16:51:09 -0700 Subject: [PATCH] tableAM updates --- cstore_tableam.c | 171 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 161 insertions(+), 10 deletions(-) diff --git a/cstore_tableam.c b/cstore_tableam.c index e64243ce2..d67ac10b6 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -1,6 +1,5 @@ #include "postgres.h" -#include "cstore_tableam.h" #include #include "miscadmin.h" @@ -19,6 +18,7 @@ #include "catalog/storage_xlog.h" #include "commands/progress.h" #include "executor/executor.h" +#include "nodes/makefuncs.h" #include "optimizer/plancat.h" #include "pgstat.h" #include "storage/bufmgr.h" @@ -31,6 +31,57 @@ #include "utils/builtins.h" #include "utils/rel.h" +#include "cstore_tableam.h" +#include "cstore_fdw.h" + +typedef struct CStoreScanDescData +{ + TableScanDescData cs_base; + TableReadState *cs_readState; +} CStoreScanDescData; + +typedef struct CStoreScanDescData *CStoreScanDesc; + +static TableWriteState *CStoreWriteState = NULL; + +static void +cstore_init_write_state(Relation relation) +{ + //TODO: upgrade lock to serialize writes + + if (CStoreWriteState != NULL) + { + // TODO: consider whether it's possible for a new write to start + // before an old one is flushed + Assert(CStoreWriteState->relation->rd_id == relation->rd_id); + } + + if (CStoreWriteState == NULL) + { + CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(relation->rd_id); + TupleDesc tupdesc = RelationGetDescr(relation); + + elog(NOTICE, "initializing write state for relation %d", relation->rd_id); + CStoreWriteState = CStoreBeginWrite(cstoreFdwOptions->filename, + cstoreFdwOptions->compressionType, + cstoreFdwOptions->stripeRowCount, + cstoreFdwOptions->blockRowCount, + tupdesc); + + CStoreWriteState->relation = relation; + } +} + +void +cstore_free_write_state() +{ + if (CStoreWriteState != NULL) + { + elog(NOTICE, "flushing write state for relation %d", CStoreWriteState->relation->rd_id); + CStoreEndWrite(CStoreWriteState); + CStoreWriteState = NULL; + } +} static const TupleTableSlotOps * cstore_slot_callbacks(Relation relation) @@ -44,13 +95,48 @@ cstore_beginscan(Relation relation, Snapshot snapshot, ParallelTableScanDesc parallel_scan, uint32 flags) { - elog(ERROR, "cstore_beginscan not implemented"); + TupleDesc tupdesc = relation->rd_att; + CStoreFdwOptions *cstoreFdwOptions = NULL; + TableReadState *readState = NULL; + CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); + List *columnList = NIL; + + cstoreFdwOptions = CStoreGetOptions(relation->rd_id); + + scan->cs_base.rs_rd = relation; + scan->cs_base.rs_snapshot = snapshot; + scan->cs_base.rs_nkeys = nkeys; + scan->cs_base.rs_key = key; + scan->cs_base.rs_flags = flags; + scan->cs_base.rs_parallel = parallel_scan; + + for (int i = 0; i < tupdesc->natts; i++) + { + Index varno = 0; + AttrNumber varattno = i+1; + Oid vartype = tupdesc->attrs[i].atttypid; + int32 vartypmod = 0; + Oid varcollid = 0; + Index varlevelsup = 0; + Var *var = makeVar(varno, varattno, vartype, vartypmod, + varcollid, varlevelsup); + + columnList = lappend(columnList, var); + } + + readState = CStoreBeginRead(cstoreFdwOptions->filename, tupdesc, + columnList, NULL); + + scan->cs_readState = readState; + + return ((TableScanDesc) scan); } static void cstore_endscan(TableScanDesc sscan) { - elog(ERROR, "cstore_endscan not implemented"); + CStoreScanDesc scan = (CStoreScanDesc) sscan; + CStoreEndRead(scan->cs_readState); } static void @@ -63,7 +149,22 @@ cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, static bool cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { - elog(ERROR, "cstore_getnextslot not implemented"); + CStoreScanDesc scan = (CStoreScanDesc) sscan; + TupleDesc tupdesc = slot->tts_tupleDescriptor; + int natts = tupdesc->natts; + bool nextRowFound; + + ExecClearTuple(slot); + memset(slot->tts_values, 0, sizeof(Datum) * natts); + memset(slot->tts_isnull, true, sizeof(bool) * natts); + + nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values, slot->tts_isnull); + + if (!nextRowFound) + return false; + + ExecStoreVirtualTuple(slot); + return true; } static Size @@ -153,7 +254,23 @@ static void cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) { - elog(ERROR, "cstore_tuple_insert not implemented"); + HeapTuple heapTuple; + + cstore_init_write_state(relation); + + heapTuple = GetSlotHeapTuple(slot); + if (HeapTupleHasExternal(heapTuple)) + { + /* detoast any toasted attributes */ + HeapTuple newTuple = toast_flatten_tuple(heapTuple, + slot->tts_tupleDescriptor); + + ExecForceStoreHeapTuple(newTuple, slot, true); + } + + slot_getallattrs(slot); + + CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull); } static void @@ -175,7 +292,26 @@ static void cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { - elog(ERROR, "cstore_multi_insert not implemented"); + cstore_init_write_state(relation); + + for (int i = 0; i < ntuples; i++) + { + TupleTableSlot *tupleSlot = slots[i]; + HeapTuple heapTuple = GetSlotHeapTuple(tupleSlot); + + if (HeapTupleHasExternal(heapTuple)) + { + /* detoast any toasted attributes */ + HeapTuple newTuple = toast_flatten_tuple(heapTuple, + tupleSlot->tts_tupleDescriptor); + + ExecForceStoreHeapTuple(newTuple, tupleSlot, true); + } + + slot_getallattrs(tupleSlot); + + CStoreWriteRow(CStoreWriteState, tupleSlot->tts_values, tupleSlot->tts_isnull); + } } static TM_Result @@ -207,7 +343,11 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, static void cstore_finish_bulk_insert(Relation relation, int options) { - elog(ERROR, "cstore_finish_bulk_insert not implemented"); + //TODO: flush relation like for heap? + // free write state or only in ExecutorEnd_hook? + + // for COPY + cstore_free_write_state(); } static void @@ -217,7 +357,15 @@ cstore_relation_set_new_filenode(Relation rel, TransactionId *freezeXid, MultiXactId *minmulti) { - elog(ERROR, "cstore_relation_set_new_filenode not implemented"); + SMgrRelation srel; + + Assert(persistence == RELPERSISTENCE_PERMANENT); + *freezeXid = RecentXmin; + *minmulti = GetOldestMultiXactId(); + srel = RelationCreateStorage(*newrnode, persistence); + CreateCStoreDatabaseDirectory(MyDatabaseId); + InitializeCStoreTableFile(rel->rd_id, rel); + smgrclose(srel); } static void @@ -295,7 +443,7 @@ cstore_relation_size(Relation rel, ForkNumber forkNumber) static bool cstore_relation_needs_toast_table(Relation rel) { - elog(ERROR, "cstore_relation_needs_toast_table not implemented"); + return false; } static void @@ -303,7 +451,10 @@ cstore_estimate_rel_size(Relation rel, int32 *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac) { - elog(ERROR, "cstore_estimate_rel_size not implemented"); + *attr_widths = 12; + *tuples = 100; + *pages = 10; + *allvisfrac = 1.0; } static bool