mirror of https://github.com/citusdata/citus.git
citus indent and Makefile fixup
parent
4dfec401ce
commit
d352cd07dd
13
Makefile
13
Makefile
|
@ -7,21 +7,22 @@ MODULE_big = cstore_fdw
|
|||
|
||||
VER := $(lastword $(shell pg_config --version))
|
||||
VER_WORDS = $(subst ., ,$(VER))
|
||||
MVER = $(firstword $(VER_WORDS))
|
||||
|
||||
# versions prior to 10 (those with 3 version numbers) not supported
|
||||
ifeq ($(words $(VER_WORDS)),3)
|
||||
# error for versions earlier than 10 so that lex comparison will work
|
||||
ifneq ($(shell printf '%02d' $(MVER)),$(MVER))
|
||||
$(error version $(VER) not supported)
|
||||
endif
|
||||
|
||||
MVER = $(firstword $(VER_WORDS))
|
||||
|
||||
# version >= 12?
|
||||
# lexicographic comparison of version number
|
||||
ifeq ($(lastword $(sort 12 $(MVER))),$(MVER))
|
||||
USE_TABLEAM = yes
|
||||
USE_FDW = yes
|
||||
else
|
||||
else ifeq ($(lastword $(sort 11 $(MVER))),$(MVER))
|
||||
USE_TABLEAM = no
|
||||
USE_FDW = yes
|
||||
else
|
||||
$(error version $(VER) is not supported)
|
||||
endif
|
||||
|
||||
PG_CPPFLAGS = -std=c11
|
||||
|
|
137
cstore_tableam.c
137
cstore_tableam.c
|
@ -36,8 +36,8 @@
|
|||
|
||||
typedef struct CStoreScanDescData
|
||||
{
|
||||
TableScanDescData cs_base;
|
||||
TableReadState *cs_readState;
|
||||
TableScanDescData cs_base;
|
||||
TableReadState *cs_readState;
|
||||
} CStoreScanDescData;
|
||||
|
||||
typedef struct CStoreScanDescData *CStoreScanDesc;
|
||||
|
@ -56,15 +56,16 @@ CStoreGetDefaultOptions(void)
|
|||
return cstoreOptions;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_init_write_state(Relation relation)
|
||||
{
|
||||
//TODO: upgrade lock to serialize writes
|
||||
/*TODO: upgrade lock to serialize writes */
|
||||
|
||||
if (CStoreWriteState != NULL)
|
||||
{
|
||||
// TODO: consider whether it's possible for a new write to start
|
||||
// before an old one is flushed
|
||||
/* TODO: consider whether it's possible for a new write to start */
|
||||
/* before an old one is flushed */
|
||||
Assert(CStoreWriteState->relation->rd_id == relation->rd_id);
|
||||
}
|
||||
|
||||
|
@ -93,35 +94,39 @@ cstore_init_write_state(Relation relation)
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
cstore_free_write_state()
|
||||
{
|
||||
if (CStoreWriteState != NULL)
|
||||
{
|
||||
elog(LOG, "flushing write state for relation %d", CStoreWriteState->relation->rd_id);
|
||||
elog(LOG, "flushing write state for relation %d",
|
||||
CStoreWriteState->relation->rd_id);
|
||||
CStoreEndWrite(CStoreWriteState);
|
||||
CStoreWriteState = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static const TupleTableSlotOps *
|
||||
cstore_slot_callbacks(Relation relation)
|
||||
{
|
||||
return &TTSOpsVirtual;
|
||||
}
|
||||
|
||||
|
||||
static TableScanDesc
|
||||
cstore_beginscan(Relation relation, Snapshot snapshot,
|
||||
int nkeys, ScanKey key,
|
||||
ParallelTableScanDesc parallel_scan,
|
||||
uint32 flags)
|
||||
{
|
||||
Oid relid = relation->rd_id;
|
||||
TupleDesc tupdesc = relation->rd_att;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||
List *columnList = NIL;
|
||||
Oid relid = relation->rd_id;
|
||||
TupleDesc tupdesc = relation->rd_att;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||
List *columnList = NIL;
|
||||
|
||||
cstoreOptions = CStoreGetDefaultOptions();
|
||||
|
||||
|
@ -134,19 +139,21 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
|
||||
for (int i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
Index varno = 0;
|
||||
AttrNumber varattno = i+1;
|
||||
Oid vartype = tupdesc->attrs[i].atttypid;
|
||||
int32 vartypmod = 0;
|
||||
Oid varcollid = 0;
|
||||
Index varlevelsup = 0;
|
||||
Var *var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
Index varno = 0;
|
||||
AttrNumber varattno = i + 1;
|
||||
Oid vartype = tupdesc->attrs[i].atttypid;
|
||||
int32 vartypmod = 0;
|
||||
Oid varcollid = 0;
|
||||
Index varlevelsup = 0;
|
||||
Var *var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
|
||||
if (!tupdesc->attrs[i].attisdropped)
|
||||
{
|
||||
columnList = lappend(columnList, var);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
readState = CStoreBeginRead(relid, tupdesc, columnList, NULL);
|
||||
readState->relation = relation;
|
||||
|
||||
|
@ -155,6 +162,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
return ((TableScanDesc) scan);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_endscan(TableScanDesc sscan)
|
||||
{
|
||||
|
@ -162,13 +170,15 @@ cstore_endscan(TableScanDesc sscan)
|
|||
CStoreEndRead(scan->cs_readState);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
||||
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
||||
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
||||
{
|
||||
elog(ERROR, "cstore_rescan not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
|
||||
{
|
||||
|
@ -181,51 +191,61 @@ cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot
|
|||
memset(slot->tts_values, 0, sizeof(Datum) * natts);
|
||||
memset(slot->tts_isnull, true, sizeof(bool) * natts);
|
||||
|
||||
nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values, slot->tts_isnull);
|
||||
nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values,
|
||||
slot->tts_isnull);
|
||||
|
||||
if (!nextRowFound)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
ExecStoreVirtualTuple(slot);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static Size
|
||||
cstore_parallelscan_estimate(Relation rel)
|
||||
{
|
||||
elog(ERROR, "cstore_parallelscan_estimate not implemented");
|
||||
}
|
||||
|
||||
|
||||
static Size
|
||||
cstore_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
|
||||
{
|
||||
elog(ERROR, "cstore_parallelscan_initialize not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
|
||||
{
|
||||
elog(ERROR, "cstore_parallelscan_reinitialize not implemented");
|
||||
}
|
||||
|
||||
|
||||
static IndexFetchTableData *
|
||||
cstore_index_fetch_begin(Relation rel)
|
||||
{
|
||||
elog(ERROR, "cstore_index_fetch_begin not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_index_fetch_reset(IndexFetchTableData *scan)
|
||||
{
|
||||
elog(ERROR, "cstore_index_fetch_reset not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_index_fetch_end(IndexFetchTableData *scan)
|
||||
{
|
||||
elog(ERROR, "cstore_index_fetch_end not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_index_fetch_tuple(struct IndexFetchTableData *scan,
|
||||
ItemPointer tid,
|
||||
|
@ -236,6 +256,7 @@ cstore_index_fetch_tuple(struct IndexFetchTableData *scan,
|
|||
elog(ERROR, "cstore_index_fetch_tuple not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_fetch_row_version(Relation relation,
|
||||
ItemPointer tid,
|
||||
|
@ -245,6 +266,7 @@ cstore_fetch_row_version(Relation relation,
|
|||
elog(ERROR, "cstore_fetch_row_version not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_get_latest_tid(TableScanDesc sscan,
|
||||
ItemPointer tid)
|
||||
|
@ -252,12 +274,14 @@ cstore_get_latest_tid(TableScanDesc sscan,
|
|||
elog(ERROR, "cstore_get_latest_tid not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_tuple_tid_valid(TableScanDesc scan, ItemPointer tid)
|
||||
{
|
||||
elog(ERROR, "cstore_tuple_tid_valid not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
||||
Snapshot snapshot)
|
||||
|
@ -265,6 +289,7 @@ cstore_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
static TransactionId
|
||||
cstore_compute_xid_horizon_for_tuples(Relation rel,
|
||||
ItemPointerData *tids,
|
||||
|
@ -273,6 +298,7 @@ cstore_compute_xid_horizon_for_tuples(Relation rel,
|
|||
elog(ERROR, "cstore_compute_xid_horizon_for_tuples not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||
int options, BulkInsertState bistate)
|
||||
|
@ -296,6 +322,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
|||
CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
||||
CommandId cid, int options,
|
||||
|
@ -304,6 +331,7 @@ cstore_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
|||
elog(ERROR, "cstore_tuple_insert_speculative not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
||||
uint32 specToken, bool succeeded)
|
||||
|
@ -311,6 +339,7 @@ cstore_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
|||
elog(ERROR, "cstore_tuple_complete_speculative not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
||||
CommandId cid, int options, BulkInsertState bistate)
|
||||
|
@ -337,6 +366,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static TM_Result
|
||||
cstore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
||||
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
||||
|
@ -345,6 +375,7 @@ cstore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
|||
elog(ERROR, "cstore_tuple_delete not implemented");
|
||||
}
|
||||
|
||||
|
||||
static TM_Result
|
||||
cstore_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
||||
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
||||
|
@ -354,6 +385,7 @@ cstore_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
|||
elog(ERROR, "cstore_tuple_update not implemented");
|
||||
}
|
||||
|
||||
|
||||
static TM_Result
|
||||
cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
||||
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
||||
|
@ -363,16 +395,18 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
|||
elog(ERROR, "cstore_tuple_lock not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_finish_bulk_insert(Relation relation, int options)
|
||||
{
|
||||
//TODO: flush relation like for heap?
|
||||
// free write state or only in ExecutorEnd_hook?
|
||||
/*TODO: flush relation like for heap? */
|
||||
/* free write state or only in ExecutorEnd_hook? */
|
||||
|
||||
// for COPY
|
||||
/* for COPY */
|
||||
cstore_free_write_state();
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_relation_set_new_filenode(Relation rel,
|
||||
const RelFileNode *newrnode,
|
||||
|
@ -390,18 +424,21 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
smgrclose(srel);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_relation_nontransactional_truncate(Relation rel)
|
||||
{
|
||||
elog(ERROR, "cstore_relation_nontransactional_truncate not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_relation_copy_data(Relation rel, const RelFileNode *newrnode)
|
||||
{
|
||||
elog(ERROR, "cstore_relation_copy_data not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
||||
Relation OldIndex, bool use_sort,
|
||||
|
@ -415,6 +452,7 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
elog(ERROR, "cstore_relation_copy_for_cluster not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
||||
BufferAccessStrategy bstrategy)
|
||||
|
@ -422,6 +460,7 @@ cstore_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
|||
elog(ERROR, "cstore_scan_analyze_next_block not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
||||
double *liverows, double *deadrows,
|
||||
|
@ -430,6 +469,7 @@ cstore_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
|||
elog(ERROR, "cstore_scan_analyze_next_tuple not implemented");
|
||||
}
|
||||
|
||||
|
||||
static double
|
||||
cstore_index_build_range_scan(Relation heapRelation,
|
||||
Relation indexRelation,
|
||||
|
@ -446,6 +486,7 @@ cstore_index_build_range_scan(Relation heapRelation,
|
|||
elog(ERROR, "cstore_index_build_range_scan not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_index_validate_scan(Relation heapRelation,
|
||||
Relation indexRelation,
|
||||
|
@ -456,32 +497,39 @@ cstore_index_validate_scan(Relation heapRelation,
|
|||
elog(ERROR, "cstore_index_validate_scan not implemented");
|
||||
}
|
||||
|
||||
|
||||
static uint64
|
||||
cstore_relation_size(Relation rel, ForkNumber forkNumber)
|
||||
{
|
||||
uint64 nblocks = 0;
|
||||
uint64 nblocks = 0;
|
||||
|
||||
/* Open it at the smgr level if not already done */
|
||||
RelationOpenSmgr(rel);
|
||||
/* Open it at the smgr level if not already done */
|
||||
RelationOpenSmgr(rel);
|
||||
|
||||
/* InvalidForkNumber indicates returning the size for all forks */
|
||||
if (forkNumber == InvalidForkNumber)
|
||||
{
|
||||
for (int i = 0; i < MAX_FORKNUM; i++)
|
||||
nblocks += smgrnblocks(rel->rd_smgr, i);
|
||||
}
|
||||
else
|
||||
nblocks = smgrnblocks(rel->rd_smgr, forkNumber);
|
||||
/* InvalidForkNumber indicates returning the size for all forks */
|
||||
if (forkNumber == InvalidForkNumber)
|
||||
{
|
||||
for (int i = 0; i < MAX_FORKNUM; i++)
|
||||
{
|
||||
nblocks += smgrnblocks(rel->rd_smgr, i);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
nblocks = smgrnblocks(rel->rd_smgr, forkNumber);
|
||||
}
|
||||
|
||||
return nblocks * BLCKSZ;
|
||||
return nblocks * BLCKSZ;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_relation_needs_toast_table(Relation rel)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
cstore_estimate_rel_size(Relation rel, int32 *attr_widths,
|
||||
BlockNumber *pages, double *tuples,
|
||||
|
@ -493,6 +541,7 @@ cstore_estimate_rel_size(Relation rel, int32 *attr_widths,
|
|||
*allvisfrac = 1.0;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_bitmap_next_block(TableScanDesc scan,
|
||||
TBMIterateResult *tbmres)
|
||||
|
@ -500,6 +549,7 @@ cstore_scan_bitmap_next_block(TableScanDesc scan,
|
|||
elog(ERROR, "cstore_scan_bitmap_next_block not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_bitmap_next_tuple(TableScanDesc scan,
|
||||
TBMIterateResult *tbmres,
|
||||
|
@ -508,12 +558,14 @@ cstore_scan_bitmap_next_tuple(TableScanDesc scan,
|
|||
elog(ERROR, "cstore_scan_bitmap_next_tuple not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
|
||||
{
|
||||
elog(ERROR, "cstore_scan_sample_next_block not implemented");
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
|
||||
TupleTableSlot *slot)
|
||||
|
@ -521,16 +573,22 @@ cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
|
|||
elog(ERROR, "cstore_scan_sample_next_tuple not implemented");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
CStoreExecutorEnd(QueryDesc *queryDesc)
|
||||
{
|
||||
cstore_free_write_state();
|
||||
if (PreviousExecutorEndHook)
|
||||
{
|
||||
PreviousExecutorEndHook(queryDesc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ExecutorEnd(queryDesc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
cstore_tableam_init()
|
||||
{
|
||||
|
@ -538,12 +596,14 @@ cstore_tableam_init()
|
|||
ExecutorEnd_hook = CStoreExecutorEnd;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
cstore_tableam_finish()
|
||||
{
|
||||
ExecutorEnd_hook = PreviousExecutorEndHook;
|
||||
}
|
||||
|
||||
|
||||
static const TableAmRoutine cstore_am_methods = {
|
||||
.type = T_TableAmRoutine,
|
||||
|
||||
|
@ -606,6 +666,7 @@ GetCstoreTableAmRoutine(void)
|
|||
return &cstore_am_methods;
|
||||
}
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(cstore_tableam_handler);
|
||||
Datum
|
||||
cstore_tableam_handler(PG_FUNCTION_ARGS)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
#include "fmgr.h"
|
||||
#include "access/tableam.h"
|
||||
|
||||
const TableAmRoutine *GetCstoreTableAmRoutine(void);
|
||||
const TableAmRoutine * GetCstoreTableAmRoutine(void);
|
||||
Datum cstore_tableam_handler(PG_FUNCTION_ARGS);
|
||||
extern void cstore_free_write_state(void);
|
||||
extern void cstore_tableam_init(void);
|
||||
|
|
Loading…
Reference in New Issue