From d352cd07dd009a2651d90d2079e1126d1d0c8b70 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Wed, 16 Sep 2020 11:51:23 -0700 Subject: [PATCH] citus indent and Makefile fixup --- Makefile | 13 ++--- cstore_tableam.c | 137 ++++++++++++++++++++++++++++++++++------------- cstore_tableam.h | 2 +- 3 files changed, 107 insertions(+), 45 deletions(-) diff --git a/Makefile b/Makefile index 8fac03cc4..9cbf4095a 100644 --- a/Makefile +++ b/Makefile @@ -7,21 +7,22 @@ MODULE_big = cstore_fdw VER := $(lastword $(shell pg_config --version)) VER_WORDS = $(subst ., ,$(VER)) +MVER = $(firstword $(VER_WORDS)) -# versions prior to 10 (those with 3 version numbers) not supported -ifeq ($(words $(VER_WORDS)),3) +# error for versions earlier than 10 so that lex comparison will work +ifneq ($(shell printf '%02d' $(MVER)),$(MVER)) $(error version $(VER) not supported) endif -MVER = $(firstword $(VER_WORDS)) - -# version >= 12? +# lexicographic comparison of version number ifeq ($(lastword $(sort 12 $(MVER))),$(MVER)) USE_TABLEAM = yes USE_FDW = yes -else +else ifeq ($(lastword $(sort 11 $(MVER))),$(MVER)) USE_TABLEAM = no USE_FDW = yes +else +$(error version $(VER) is not supported) endif PG_CPPFLAGS = -std=c11 diff --git a/cstore_tableam.c b/cstore_tableam.c index 381f3edd8..fccb9fe6e 100644 --- a/cstore_tableam.c +++ b/cstore_tableam.c @@ -36,8 +36,8 @@ typedef struct CStoreScanDescData { - TableScanDescData cs_base; - TableReadState *cs_readState; + TableScanDescData cs_base; + TableReadState *cs_readState; } CStoreScanDescData; typedef struct CStoreScanDescData *CStoreScanDesc; @@ -56,15 +56,16 @@ CStoreGetDefaultOptions(void) return cstoreOptions; } + static void cstore_init_write_state(Relation relation) { - //TODO: upgrade lock to serialize writes + /*TODO: upgrade lock to serialize writes */ if (CStoreWriteState != NULL) { - // TODO: consider whether it's possible for a new write to start - // before an old one is flushed + /* TODO: consider whether it's possible for a new write to start */ + /* before an old one is flushed */ Assert(CStoreWriteState->relation->rd_id == relation->rd_id); } @@ -93,35 +94,39 @@ cstore_init_write_state(Relation relation) } } + void cstore_free_write_state() { if (CStoreWriteState != NULL) { - elog(LOG, "flushing write state for relation %d", CStoreWriteState->relation->rd_id); + elog(LOG, "flushing write state for relation %d", + CStoreWriteState->relation->rd_id); CStoreEndWrite(CStoreWriteState); CStoreWriteState = NULL; } } + static const TupleTableSlotOps * cstore_slot_callbacks(Relation relation) { return &TTSOpsVirtual; } + static TableScanDesc cstore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, ParallelTableScanDesc parallel_scan, uint32 flags) { - Oid relid = relation->rd_id; - TupleDesc tupdesc = relation->rd_att; - CStoreOptions *cstoreOptions = NULL; - TableReadState *readState = NULL; - CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); - List *columnList = NIL; + Oid relid = relation->rd_id; + TupleDesc tupdesc = relation->rd_att; + CStoreOptions *cstoreOptions = NULL; + TableReadState *readState = NULL; + CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); + List *columnList = NIL; cstoreOptions = CStoreGetDefaultOptions(); @@ -134,19 +139,21 @@ cstore_beginscan(Relation relation, Snapshot snapshot, for (int i = 0; i < tupdesc->natts; i++) { - Index varno = 0; - AttrNumber varattno = i+1; - Oid vartype = tupdesc->attrs[i].atttypid; - int32 vartypmod = 0; - Oid varcollid = 0; - Index varlevelsup = 0; - Var *var = makeVar(varno, varattno, vartype, vartypmod, - varcollid, varlevelsup); + Index varno = 0; + AttrNumber varattno = i + 1; + Oid vartype = tupdesc->attrs[i].atttypid; + int32 vartypmod = 0; + Oid varcollid = 0; + Index varlevelsup = 0; + Var *var = makeVar(varno, varattno, vartype, vartypmod, + varcollid, varlevelsup); if (!tupdesc->attrs[i].attisdropped) + { columnList = lappend(columnList, var); + } } - + readState = CStoreBeginRead(relid, tupdesc, columnList, NULL); readState->relation = relation; @@ -155,6 +162,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot, return ((TableScanDesc) scan); } + static void cstore_endscan(TableScanDesc sscan) { @@ -162,13 +170,15 @@ cstore_endscan(TableScanDesc sscan) CStoreEndRead(scan->cs_readState); } + static void cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, - bool allow_strat, bool allow_sync, bool allow_pagemode) + bool allow_strat, bool allow_sync, bool allow_pagemode) { elog(ERROR, "cstore_rescan not implemented"); } + static bool cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { @@ -181,51 +191,61 @@ cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot memset(slot->tts_values, 0, sizeof(Datum) * natts); memset(slot->tts_isnull, true, sizeof(bool) * natts); - nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values, slot->tts_isnull); + nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values, + slot->tts_isnull); if (!nextRowFound) + { return false; + } ExecStoreVirtualTuple(slot); return true; } + static Size cstore_parallelscan_estimate(Relation rel) { elog(ERROR, "cstore_parallelscan_estimate not implemented"); } + static Size cstore_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) { elog(ERROR, "cstore_parallelscan_initialize not implemented"); } + static void cstore_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) { elog(ERROR, "cstore_parallelscan_reinitialize not implemented"); } + static IndexFetchTableData * cstore_index_fetch_begin(Relation rel) { elog(ERROR, "cstore_index_fetch_begin not implemented"); } + static void cstore_index_fetch_reset(IndexFetchTableData *scan) { elog(ERROR, "cstore_index_fetch_reset not implemented"); } + static void cstore_index_fetch_end(IndexFetchTableData *scan) { elog(ERROR, "cstore_index_fetch_end not implemented"); } + static bool cstore_index_fetch_tuple(struct IndexFetchTableData *scan, ItemPointer tid, @@ -236,6 +256,7 @@ cstore_index_fetch_tuple(struct IndexFetchTableData *scan, elog(ERROR, "cstore_index_fetch_tuple not implemented"); } + static bool cstore_fetch_row_version(Relation relation, ItemPointer tid, @@ -245,6 +266,7 @@ cstore_fetch_row_version(Relation relation, elog(ERROR, "cstore_fetch_row_version not implemented"); } + static void cstore_get_latest_tid(TableScanDesc sscan, ItemPointer tid) @@ -252,12 +274,14 @@ cstore_get_latest_tid(TableScanDesc sscan, elog(ERROR, "cstore_get_latest_tid not implemented"); } + static bool cstore_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) { elog(ERROR, "cstore_tuple_tid_valid not implemented"); } + static bool cstore_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) @@ -265,6 +289,7 @@ cstore_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, return true; } + static TransactionId cstore_compute_xid_horizon_for_tuples(Relation rel, ItemPointerData *tids, @@ -273,6 +298,7 @@ cstore_compute_xid_horizon_for_tuples(Relation rel, elog(ERROR, "cstore_compute_xid_horizon_for_tuples not implemented"); } + static void cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) @@ -296,6 +322,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull); } + static void cstore_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid, int options, @@ -304,6 +331,7 @@ cstore_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, elog(ERROR, "cstore_tuple_insert_speculative not implemented"); } + static void cstore_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 specToken, bool succeeded) @@ -311,6 +339,7 @@ cstore_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, elog(ERROR, "cstore_tuple_complete_speculative not implemented"); } + static void cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) @@ -337,6 +366,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, } } + static TM_Result cstore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, @@ -345,6 +375,7 @@ cstore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, elog(ERROR, "cstore_tuple_delete not implemented"); } + static TM_Result cstore_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, @@ -354,6 +385,7 @@ cstore_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, elog(ERROR, "cstore_tuple_update not implemented"); } + static TM_Result cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, @@ -363,16 +395,18 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, elog(ERROR, "cstore_tuple_lock not implemented"); } + static void cstore_finish_bulk_insert(Relation relation, int options) { - //TODO: flush relation like for heap? - // free write state or only in ExecutorEnd_hook? + /*TODO: flush relation like for heap? */ + /* free write state or only in ExecutorEnd_hook? */ - // for COPY + /* for COPY */ cstore_free_write_state(); } + static void cstore_relation_set_new_filenode(Relation rel, const RelFileNode *newrnode, @@ -390,18 +424,21 @@ cstore_relation_set_new_filenode(Relation rel, smgrclose(srel); } + static void cstore_relation_nontransactional_truncate(Relation rel) { elog(ERROR, "cstore_relation_nontransactional_truncate not implemented"); } + static void cstore_relation_copy_data(Relation rel, const RelFileNode *newrnode) { elog(ERROR, "cstore_relation_copy_data not implemented"); } + static void cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, @@ -415,6 +452,7 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, elog(ERROR, "cstore_relation_copy_for_cluster not implemented"); } + static bool cstore_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, BufferAccessStrategy bstrategy) @@ -422,6 +460,7 @@ cstore_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, elog(ERROR, "cstore_scan_analyze_next_block not implemented"); } + static bool cstore_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, double *liverows, double *deadrows, @@ -430,6 +469,7 @@ cstore_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, elog(ERROR, "cstore_scan_analyze_next_tuple not implemented"); } + static double cstore_index_build_range_scan(Relation heapRelation, Relation indexRelation, @@ -446,6 +486,7 @@ cstore_index_build_range_scan(Relation heapRelation, elog(ERROR, "cstore_index_build_range_scan not implemented"); } + static void cstore_index_validate_scan(Relation heapRelation, Relation indexRelation, @@ -456,32 +497,39 @@ cstore_index_validate_scan(Relation heapRelation, elog(ERROR, "cstore_index_validate_scan not implemented"); } + static uint64 cstore_relation_size(Relation rel, ForkNumber forkNumber) { - uint64 nblocks = 0; + uint64 nblocks = 0; - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(rel); + /* Open it at the smgr level if not already done */ + RelationOpenSmgr(rel); - /* InvalidForkNumber indicates returning the size for all forks */ - if (forkNumber == InvalidForkNumber) - { - for (int i = 0; i < MAX_FORKNUM; i++) - nblocks += smgrnblocks(rel->rd_smgr, i); - } - else - nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + /* InvalidForkNumber indicates returning the size for all forks */ + if (forkNumber == InvalidForkNumber) + { + for (int i = 0; i < MAX_FORKNUM; i++) + { + nblocks += smgrnblocks(rel->rd_smgr, i); + } + } + else + { + nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + } - return nblocks * BLCKSZ; + return nblocks * BLCKSZ; } + static bool cstore_relation_needs_toast_table(Relation rel) { return false; } + static void cstore_estimate_rel_size(Relation rel, int32 *attr_widths, BlockNumber *pages, double *tuples, @@ -493,6 +541,7 @@ cstore_estimate_rel_size(Relation rel, int32 *attr_widths, *allvisfrac = 1.0; } + static bool cstore_scan_bitmap_next_block(TableScanDesc scan, TBMIterateResult *tbmres) @@ -500,6 +549,7 @@ cstore_scan_bitmap_next_block(TableScanDesc scan, elog(ERROR, "cstore_scan_bitmap_next_block not implemented"); } + static bool cstore_scan_bitmap_next_tuple(TableScanDesc scan, TBMIterateResult *tbmres, @@ -508,12 +558,14 @@ cstore_scan_bitmap_next_tuple(TableScanDesc scan, elog(ERROR, "cstore_scan_bitmap_next_tuple not implemented"); } + static bool cstore_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) { elog(ERROR, "cstore_scan_sample_next_block not implemented"); } + static bool cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, TupleTableSlot *slot) @@ -521,16 +573,22 @@ cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, elog(ERROR, "cstore_scan_sample_next_tuple not implemented"); } + static void CStoreExecutorEnd(QueryDesc *queryDesc) { cstore_free_write_state(); if (PreviousExecutorEndHook) + { PreviousExecutorEndHook(queryDesc); + } else + { standard_ExecutorEnd(queryDesc); + } } + void cstore_tableam_init() { @@ -538,12 +596,14 @@ cstore_tableam_init() ExecutorEnd_hook = CStoreExecutorEnd; } + void cstore_tableam_finish() { ExecutorEnd_hook = PreviousExecutorEndHook; } + static const TableAmRoutine cstore_am_methods = { .type = T_TableAmRoutine, @@ -606,6 +666,7 @@ GetCstoreTableAmRoutine(void) return &cstore_am_methods; } + PG_FUNCTION_INFO_V1(cstore_tableam_handler); Datum cstore_tableam_handler(PG_FUNCTION_ARGS) diff --git a/cstore_tableam.h b/cstore_tableam.h index bd1f3805e..3a556728a 100644 --- a/cstore_tableam.h +++ b/cstore_tableam.h @@ -2,7 +2,7 @@ #include "fmgr.h" #include "access/tableam.h" -const TableAmRoutine *GetCstoreTableAmRoutine(void); +const TableAmRoutine * GetCstoreTableAmRoutine(void); Datum cstore_tableam_handler(PG_FUNCTION_ARGS); extern void cstore_free_write_state(void); extern void cstore_tableam_init(void);