fixup rebase

merge-cstore-pykello
Jeff Davis 2020-09-15 09:01:38 -07:00
parent b06f48a2a7
commit aa422f2da0
3 changed files with 29 additions and 20 deletions

View File

@ -252,8 +252,6 @@ typedef struct TableWriteState
extern CompressionType ParseCompressionType(const char *compressionTypeString); extern CompressionType ParseCompressionType(const char *compressionTypeString);
extern void InitializeCStoreTableFile(Oid relationId, Relation relation, extern void InitializeCStoreTableFile(Oid relationId, Relation relation,
CStoreOptions *cstoreOptions); CStoreOptions *cstoreOptions);
extern void CreateCStoreDatabaseDirectory(Oid databaseOid);
extern void RemoveCStoreDatabaseDirectory(Oid databaseOid);
/* Function declarations for writing to a cstore file */ /* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(Oid relationId, extern TableWriteState * CStoreBeginWrite(Oid relationId,

View File

@ -31,8 +31,8 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "cstore.h"
#include "cstore_tableam.h" #include "cstore_tableam.h"
#include "cstore_fdw.h"
typedef struct CStoreScanDescData typedef struct CStoreScanDescData
{ {
@ -44,6 +44,16 @@ typedef struct CStoreScanDescData *CStoreScanDesc;
static TableWriteState *CStoreWriteState = NULL; static TableWriteState *CStoreWriteState = NULL;
static CStoreOptions *
CStoreGetDefaultOptions(void)
{
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
cstoreOptions->compressionType = DEFAULT_COMPRESSION_TYPE;
cstoreOptions->stripeRowCount = DEFAULT_STRIPE_ROW_COUNT;
cstoreOptions->blockRowCount = DEFAULT_BLOCK_ROW_COUNT;
return cstoreOptions;
}
static void static void
cstore_init_write_state(Relation relation) cstore_init_write_state(Relation relation)
{ {
@ -58,14 +68,14 @@ cstore_init_write_state(Relation relation)
if (CStoreWriteState == NULL) if (CStoreWriteState == NULL)
{ {
CStoreFdwOptions *cstoreFdwOptions = CStoreGetOptions(relation->rd_id); CStoreOptions *cstoreOptions = CStoreGetDefaultOptions();
TupleDesc tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
elog(NOTICE, "initializing write state for relation %d", relation->rd_id); elog(NOTICE, "initializing write state for relation %d", relation->rd_id);
CStoreWriteState = CStoreBeginWrite(cstoreFdwOptions->filename, CStoreWriteState = CStoreBeginWrite(relation->rd_id,
cstoreFdwOptions->compressionType, cstoreOptions->compressionType,
cstoreFdwOptions->stripeRowCount, cstoreOptions->stripeRowCount,
cstoreFdwOptions->blockRowCount, cstoreOptions->blockRowCount,
tupdesc); tupdesc);
CStoreWriteState->relation = relation; CStoreWriteState->relation = relation;
@ -95,13 +105,14 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
ParallelTableScanDesc parallel_scan, ParallelTableScanDesc parallel_scan,
uint32 flags) uint32 flags)
{ {
TupleDesc tupdesc = relation->rd_att; Oid relid = relation->rd_id;
CStoreFdwOptions *cstoreFdwOptions = NULL; TupleDesc tupdesc = relation->rd_att;
TableReadState *readState = NULL; CStoreOptions *cstoreOptions = NULL;
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); TableReadState *readState = NULL;
List *columnList = NIL; CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
List *columnList = NIL;
cstoreFdwOptions = CStoreGetOptions(relation->rd_id); cstoreOptions = CStoreGetDefaultOptions();
scan->cs_base.rs_rd = relation; scan->cs_base.rs_rd = relation;
scan->cs_base.rs_snapshot = snapshot; scan->cs_base.rs_snapshot = snapshot;
@ -124,8 +135,8 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
columnList = lappend(columnList, var); columnList = lappend(columnList, var);
} }
readState = CStoreBeginRead(cstoreFdwOptions->filename, tupdesc, readState = CStoreBeginRead(relid, tupdesc, columnList, NULL);
columnList, NULL); readState->relation = relation;
scan->cs_readState = readState; scan->cs_readState = readState;
@ -258,7 +269,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
cstore_init_write_state(relation); cstore_init_write_state(relation);
heapTuple = GetSlotHeapTuple(slot); heapTuple = ExecCopySlotHeapTuple(slot);
if (HeapTupleHasExternal(heapTuple)) if (HeapTupleHasExternal(heapTuple))
{ {
/* detoast any toasted attributes */ /* detoast any toasted attributes */
@ -297,7 +308,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
for (int i = 0; i < ntuples; i++) for (int i = 0; i < ntuples; i++)
{ {
TupleTableSlot *tupleSlot = slots[i]; TupleTableSlot *tupleSlot = slots[i];
HeapTuple heapTuple = GetSlotHeapTuple(tupleSlot); HeapTuple heapTuple = ExecCopySlotHeapTuple(tupleSlot);
if (HeapTupleHasExternal(heapTuple)) if (HeapTupleHasExternal(heapTuple))
{ {
@ -363,8 +374,7 @@ cstore_relation_set_new_filenode(Relation rel,
*freezeXid = RecentXmin; *freezeXid = RecentXmin;
*minmulti = GetOldestMultiXactId(); *minmulti = GetOldestMultiXactId();
srel = RelationCreateStorage(*newrnode, persistence); srel = RelationCreateStorage(*newrnode, persistence);
CreateCStoreDatabaseDirectory(MyDatabaseId); InitializeCStoreTableFile(rel->rd_id, rel, CStoreGetDefaultOptions());
InitializeCStoreTableFile(rel->rd_id, rel);
smgrclose(srel); smgrclose(srel);
} }

View File

@ -4,3 +4,4 @@
const TableAmRoutine *GetCstoreTableAmRoutine(void); const TableAmRoutine *GetCstoreTableAmRoutine(void);
Datum cstore_tableam_handler(PG_FUNCTION_ARGS); Datum cstore_tableam_handler(PG_FUNCTION_ARGS);
extern void cstore_free_write_state(void);