mirror of https://github.com/citusdata/citus.git
3080 lines
84 KiB
C
3080 lines
84 KiB
C
#include "citus_version.h"
|
|
|
|
#include "postgres.h"
|
|
|
|
#include <math.h>
|
|
|
|
#include "miscadmin.h"
|
|
|
|
#include "access/genam.h"
|
|
#include "access/heapam.h"
|
|
#include "access/multixact.h"
|
|
#include "access/rewriteheap.h"
|
|
#include "access/tableam.h"
|
|
#include "access/tsmapi.h"
|
|
#include "access/detoast.h"
|
|
#include "access/xact.h"
|
|
#include "catalog/catalog.h"
|
|
#include "catalog/index.h"
|
|
#include "catalog/namespace.h"
|
|
#include "catalog/objectaccess.h"
|
|
#include "catalog/pg_am.h"
|
|
#include "catalog/pg_publication.h"
|
|
#include "catalog/pg_trigger.h"
|
|
#include "catalog/pg_extension.h"
|
|
#include "catalog/storage.h"
|
|
#include "catalog/storage_xlog.h"
|
|
#include "commands/defrem.h"
|
|
#include "commands/progress.h"
|
|
#include "commands/vacuum.h"
|
|
#include "commands/extension.h"
|
|
#include "executor/executor.h"
|
|
#include "nodes/makefuncs.h"
|
|
#include "optimizer/plancat.h"
|
|
#include "pgstat.h"
|
|
#include "safe_lib.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "storage/bufpage.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "storage/lmgr.h"
|
|
#include "storage/predicate.h"
|
|
#include "storage/procarray.h"
|
|
#include "storage/smgr.h"
|
|
#include "tcop/utility.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/fmgroids.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/pg_rusage.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/relcache.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/syscache.h"
|
|
#include "columnar/columnar.h"
|
|
#include "columnar/columnar_customscan.h"
|
|
#include "columnar/columnar_storage.h"
|
|
#include "columnar/columnar_tableam.h"
|
|
#include "columnar/columnar_version_compat.h"
|
|
#include "distributed/listutils.h"
|
|
|
|
/*
|
|
* Timing parameters for truncate locking heuristics.
|
|
*
|
|
* These are the same values from src/backend/access/heap/vacuumlazy.c
|
|
*/
|
|
#define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */
|
|
#define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */
|
|
|
|
/*
|
|
* ColumnarScanDescData is the scan state passed between beginscan(),
|
|
* getnextslot(), rescan(), and endscan() calls.
|
|
*/
|
|
typedef struct ColumnarScanDescData
|
|
{
|
|
TableScanDescData cs_base;
|
|
ColumnarReadState *cs_readState;
|
|
|
|
/*
|
|
* We initialize cs_readState lazily in the first getnextslot() call. We
|
|
* need the following for initialization. We save them in beginscan().
|
|
*/
|
|
MemoryContext scanContext;
|
|
Bitmapset *attr_needed;
|
|
List *scanQual;
|
|
} ColumnarScanDescData;
|
|
|
|
|
|
/*
|
|
* IndexFetchColumnarData is the scan state passed between index_fetch_begin,
|
|
* index_fetch_reset, index_fetch_end, index_fetch_tuple calls.
|
|
*/
|
|
typedef struct IndexFetchColumnarData
|
|
{
|
|
IndexFetchTableData cs_base;
|
|
ColumnarReadState *cs_readState;
|
|
|
|
/*
|
|
* We initialize cs_readState lazily in the first columnar_index_fetch_tuple
|
|
* call. However, we want to do memory allocations in a sub MemoryContext of
|
|
* columnar_index_fetch_begin. For this reason, we store scanContext in
|
|
* columnar_index_fetch_begin.
|
|
*/
|
|
MemoryContext scanContext;
|
|
} IndexFetchColumnarData;
|
|
|
|
static object_access_hook_type PrevObjectAccessHook = NULL;
|
|
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
|
|
|
|
/* forward declaration for static functions */
|
|
static MemoryContext CreateColumnarScanMemoryContext(void);
|
|
static void ColumnarTableDropHook(Oid tgid);
|
|
static void ColumnarTriggerCreateHook(Oid tgid);
|
|
static void ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId,
|
|
Oid objectId, int subId,
|
|
void *arg);
|
|
static RangeVar * ColumnarProcessAlterTable(AlterTableStmt *alterTableStmt,
|
|
List **columnarOptions);
|
|
static void ColumnarProcessUtility(PlannedStmt *pstmt,
|
|
const char *queryString,
|
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
|
bool readOnlyTree,
|
|
#endif
|
|
ProcessUtilityContext context,
|
|
ParamListInfo params,
|
|
struct QueryEnvironment *queryEnv,
|
|
DestReceiver *dest,
|
|
QueryCompletion *completionTag);
|
|
static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode,
|
|
int timeout, int retryInterval);
|
|
static List * NeededColumnsList(TupleDesc tupdesc, Bitmapset *attr_needed);
|
|
static void LogRelationStats(Relation rel, int elevel);
|
|
static void TruncateColumnar(Relation rel, int elevel);
|
|
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
|
static void ColumnarCheckLogicalReplication(Relation rel);
|
|
static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull);
|
|
static ItemPointerData row_number_to_tid(uint64 rowNumber);
|
|
static uint64 tid_to_row_number(ItemPointerData tid);
|
|
static void ErrorIfInvalidRowNumber(uint64 rowNumber);
|
|
static void ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot,
|
|
int progressArrIndex);
|
|
static BlockNumber ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot);
|
|
static ItemPointerData ColumnarGetHighestItemPointer(Relation relation,
|
|
Snapshot snapshot);
|
|
static double ColumnarReadRowsIntoIndex(TableScanDesc scan,
|
|
Relation indexRelation,
|
|
IndexInfo *indexInfo,
|
|
bool progress,
|
|
IndexBuildCallback indexCallback,
|
|
void *indexCallbackState,
|
|
EState *estate, ExprState *predicate);
|
|
static void ColumnarReadMissingRowsIntoIndex(TableScanDesc scan, Relation indexRelation,
|
|
IndexInfo *indexInfo, EState *estate,
|
|
ExprState *predicate,
|
|
ValidateIndexState *state);
|
|
static ItemPointerData TupleSortSkipSmallerItemPointers(Tuplesortstate *tupleSort,
|
|
ItemPointer targetItemPointer);
|
|
|
|
/* functions for CheckCitusColumnarVersion */
|
|
static bool CheckAvailableVersionColumnar(int elevel);
|
|
static bool CheckInstalledVersionColumnar(int elevel);
|
|
static char * AvailableExtensionVersionColumnar(void);
|
|
static char * InstalledExtensionVersionColumnar(void);
|
|
static bool CitusColumnarHasBeenLoadedInternal(void);
|
|
static bool CitusColumnarHasBeenLoaded(void);
|
|
static bool CheckCitusColumnarVersion(int elevel);
|
|
static bool MajorVersionsCompatibleColumnar(char *leftVersion, char *rightVersion);
|
|
|
|
/* global variables for CheckCitusColumnarVersion */
|
|
static bool extensionLoadedColumnar = false;
|
|
static bool EnableVersionChecksColumnar = true;
|
|
static bool citusVersionKnownCompatibleColumnar = false;
|
|
|
|
/* Custom tuple slot ops used for columnar. Initialized in columnar_tableam_init(). */
|
|
static TupleTableSlotOps TTSOpsColumnar;
|
|
|
|
static const TupleTableSlotOps *
|
|
columnar_slot_callbacks(Relation relation)
|
|
{
|
|
return &TTSOpsColumnar;
|
|
}
|
|
|
|
|
|
static TableScanDesc
|
|
columnar_beginscan(Relation relation, Snapshot snapshot,
|
|
int nkeys, ScanKey key,
|
|
ParallelTableScanDesc parallel_scan,
|
|
uint32 flags)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
int natts = relation->rd_att->natts;
|
|
|
|
/* attr_needed represents 0-indexed attribute numbers */
|
|
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);
|
|
|
|
TableScanDesc scandesc = columnar_beginscan_extended(relation, snapshot, nkeys, key,
|
|
parallel_scan,
|
|
flags, attr_needed, NULL);
|
|
|
|
bms_free(attr_needed);
|
|
|
|
return scandesc;
|
|
}
|
|
|
|
|
|
TableScanDesc
|
|
columnar_beginscan_extended(Relation relation, Snapshot snapshot,
|
|
int nkeys, ScanKey key,
|
|
ParallelTableScanDesc parallel_scan,
|
|
uint32 flags, Bitmapset *attr_needed, List *scanQual)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
Oid relfilenode = relation->rd_node.relNode;
|
|
|
|
/*
|
|
* A memory context to use for scan-wide data, including the lazily
|
|
* initialized read state. We assume that beginscan is called in a
|
|
* context that will last until end of scan.
|
|
*/
|
|
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
|
|
|
ColumnarScanDesc scan = palloc0(sizeof(ColumnarScanDescData));
|
|
scan->cs_base.rs_rd = relation;
|
|
scan->cs_base.rs_snapshot = snapshot;
|
|
scan->cs_base.rs_nkeys = nkeys;
|
|
scan->cs_base.rs_key = key;
|
|
scan->cs_base.rs_flags = flags;
|
|
scan->cs_base.rs_parallel = parallel_scan;
|
|
|
|
/*
|
|
* We will initialize this lazily in first tuple, where we have the actual
|
|
* tuple descriptor to use for reading. In some cases like ALTER TABLE ...
|
|
* ALTER COLUMN ... TYPE, the tuple descriptor of relation doesn't match
|
|
* the storage which we are reading, so we need to use the tuple descriptor
|
|
* of "slot" in first read.
|
|
*/
|
|
scan->cs_readState = NULL;
|
|
scan->attr_needed = bms_copy(attr_needed);
|
|
scan->scanQual = copyObject(scanQual);
|
|
scan->scanContext = scanContext;
|
|
|
|
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
|
|
{
|
|
elog(ERROR,
|
|
"cannot read from table when there is unflushed data in upper transactions");
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
return ((TableScanDesc) scan);
|
|
}
|
|
|
|
|
|
/*
|
|
* CreateColumnarScanMemoryContext creates a memory context to store
|
|
* ColumnarReadStare in it.
|
|
*/
|
|
static MemoryContext
|
|
CreateColumnarScanMemoryContext(void)
|
|
{
|
|
return AllocSetContextCreate(CurrentMemoryContext, "Columnar Scan Context",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
}
|
|
|
|
|
|
/*
|
|
* init_columnar_read_state initializes a column store table read and returns the
|
|
* state.
|
|
*/
|
|
static ColumnarReadState *
|
|
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
|
List *scanQual, MemoryContext scanContext, Snapshot snapshot,
|
|
bool randomAccess)
|
|
{
|
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
|
|
|
List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
|
|
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
|
|
scanQual, scanContext, snapshot,
|
|
randomAccess);
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
return readState;
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_endscan(TableScanDesc sscan)
|
|
{
|
|
ColumnarScanDesc scan = (ColumnarScanDesc) sscan;
|
|
if (scan->cs_readState != NULL)
|
|
{
|
|
ColumnarEndRead(scan->cs_readState);
|
|
scan->cs_readState = NULL;
|
|
}
|
|
|
|
if (scan->cs_base.rs_flags & SO_TEMP_SNAPSHOT)
|
|
{
|
|
UnregisterSnapshot(scan->cs_base.rs_snapshot);
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
|
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
|
{
|
|
ColumnarScanDesc scan = (ColumnarScanDesc) sscan;
|
|
|
|
/* XXX: hack to pass in new quals that aren't actually scan keys */
|
|
List *scanQual = (List *) key;
|
|
|
|
if (scan->cs_readState != NULL)
|
|
{
|
|
ColumnarRescan(scan->cs_readState, scanQual);
|
|
}
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
|
|
{
|
|
ColumnarScanDesc scan = (ColumnarScanDesc) sscan;
|
|
|
|
/*
|
|
* if this is the first row, initialize read state.
|
|
*/
|
|
if (scan->cs_readState == NULL)
|
|
{
|
|
bool randomAccess = false;
|
|
scan->cs_readState =
|
|
init_columnar_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
|
|
scan->attr_needed, scan->scanQual,
|
|
scan->scanContext, scan->cs_base.rs_snapshot,
|
|
randomAccess);
|
|
}
|
|
|
|
ExecClearTuple(slot);
|
|
|
|
uint64 rowNumber;
|
|
bool nextRowFound = ColumnarReadNextRow(scan->cs_readState, slot->tts_values,
|
|
slot->tts_isnull, &rowNumber);
|
|
|
|
if (!nextRowFound)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
ExecStoreVirtualTuple(slot);
|
|
|
|
slot->tts_tid = row_number_to_tid(rowNumber);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* row_number_to_tid maps given rowNumber to ItemPointerData.
|
|
*/
|
|
static ItemPointerData
|
|
row_number_to_tid(uint64 rowNumber)
|
|
{
|
|
ErrorIfInvalidRowNumber(rowNumber);
|
|
|
|
ItemPointerData tid = { 0 };
|
|
ItemPointerSetBlockNumber(&tid, rowNumber / VALID_ITEMPOINTER_OFFSETS);
|
|
ItemPointerSetOffsetNumber(&tid, rowNumber % VALID_ITEMPOINTER_OFFSETS +
|
|
FirstOffsetNumber);
|
|
return tid;
|
|
}
|
|
|
|
|
|
/*
|
|
* tid_to_row_number maps given ItemPointerData to rowNumber.
|
|
*/
|
|
static uint64
|
|
tid_to_row_number(ItemPointerData tid)
|
|
{
|
|
uint64 rowNumber = ItemPointerGetBlockNumber(&tid) * VALID_ITEMPOINTER_OFFSETS +
|
|
ItemPointerGetOffsetNumber(&tid) - FirstOffsetNumber;
|
|
|
|
ErrorIfInvalidRowNumber(rowNumber);
|
|
|
|
return rowNumber;
|
|
}
|
|
|
|
|
|
/*
|
|
* ErrorIfInvalidRowNumber errors out if given rowNumber is invalid.
|
|
*/
|
|
static void
|
|
ErrorIfInvalidRowNumber(uint64 rowNumber)
|
|
{
|
|
if (rowNumber == COLUMNAR_INVALID_ROW_NUMBER)
|
|
{
|
|
/* not expected but be on the safe side */
|
|
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
|
|
errmsg("unexpected row number for columnar table")));
|
|
}
|
|
else if (rowNumber > COLUMNAR_MAX_ROW_NUMBER)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("columnar tables can't have row numbers "
|
|
"greater than " UINT64_FORMAT,
|
|
(uint64) COLUMNAR_MAX_ROW_NUMBER),
|
|
errhint("Consider using VACUUM FULL for your table")));
|
|
}
|
|
}
|
|
|
|
|
|
static Size
|
|
columnar_parallelscan_estimate(Relation rel)
|
|
{
|
|
elog(ERROR, "columnar_parallelscan_estimate not implemented");
|
|
}
|
|
|
|
|
|
static Size
|
|
columnar_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
|
|
{
|
|
elog(ERROR, "columnar_parallelscan_initialize not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
|
|
{
|
|
elog(ERROR, "columnar_parallelscan_reinitialize not implemented");
|
|
}
|
|
|
|
|
|
static IndexFetchTableData *
|
|
columnar_index_fetch_begin(Relation rel)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
Oid relfilenode = rel->rd_node.relNode;
|
|
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
|
|
{
|
|
/* XXX: maybe we can just flush the data and continue */
|
|
elog(ERROR, "cannot read from index when there is unflushed data in "
|
|
"upper transactions");
|
|
}
|
|
|
|
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
|
|
|
IndexFetchColumnarData *scan = palloc0(sizeof(IndexFetchColumnarData));
|
|
scan->cs_base.rel = rel;
|
|
scan->cs_readState = NULL;
|
|
scan->scanContext = scanContext;
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
return &scan->cs_base;
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_index_fetch_reset(IndexFetchTableData *sscan)
|
|
{
|
|
/* no-op */
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_index_fetch_end(IndexFetchTableData *sscan)
|
|
{
|
|
columnar_index_fetch_reset(sscan);
|
|
|
|
IndexFetchColumnarData *scan = (IndexFetchColumnarData *) sscan;
|
|
if (scan->cs_readState)
|
|
{
|
|
ColumnarEndRead(scan->cs_readState);
|
|
scan->cs_readState = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
|
|
ItemPointer tid,
|
|
Snapshot snapshot,
|
|
TupleTableSlot *slot,
|
|
bool *call_again, bool *all_dead)
|
|
{
|
|
/* no HOT chains are possible in columnar, directly set it to false */
|
|
*call_again = false;
|
|
|
|
/*
|
|
* Initialize all_dead to false if passed to be non-NULL.
|
|
*
|
|
* XXX: For aborted writes, we should set all_dead to true but this would
|
|
* require implementing columnar_index_delete_tuples for simple deletion
|
|
* of dead tuples (TM_IndexDeleteOp.bottomup = false).
|
|
*/
|
|
if (all_dead)
|
|
{
|
|
*all_dead = false;
|
|
}
|
|
|
|
ExecClearTuple(slot);
|
|
|
|
IndexFetchColumnarData *scan = (IndexFetchColumnarData *) sscan;
|
|
Relation columnarRelation = scan->cs_base.rel;
|
|
|
|
/* initialize read state for the first row */
|
|
if (scan->cs_readState == NULL)
|
|
{
|
|
/* we need all columns */
|
|
int natts = columnarRelation->rd_att->natts;
|
|
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);
|
|
|
|
/* no quals for index scan */
|
|
List *scanQual = NIL;
|
|
|
|
bool randomAccess = true;
|
|
scan->cs_readState = init_columnar_read_state(columnarRelation,
|
|
slot->tts_tupleDescriptor,
|
|
attr_needed, scanQual,
|
|
scan->scanContext,
|
|
snapshot, randomAccess);
|
|
}
|
|
|
|
uint64 rowNumber = tid_to_row_number(*tid);
|
|
StripeMetadata *stripeMetadata =
|
|
FindStripeWithMatchingFirstRowNumber(columnarRelation, rowNumber, snapshot);
|
|
if (!stripeMetadata)
|
|
{
|
|
/* it is certain that tuple with rowNumber doesn't exist */
|
|
return false;
|
|
}
|
|
|
|
StripeWriteStateEnum stripeWriteState = StripeWriteState(stripeMetadata);
|
|
if (stripeWriteState == STRIPE_WRITE_FLUSHED &&
|
|
!ColumnarReadRowByRowNumber(scan->cs_readState, rowNumber,
|
|
slot->tts_values, slot->tts_isnull))
|
|
{
|
|
/*
|
|
* FindStripeWithMatchingFirstRowNumber doesn't verify upper row
|
|
* number boundary of found stripe. For this reason, we didn't
|
|
* certainly know if given row number belongs to one of the stripes.
|
|
*/
|
|
return false;
|
|
}
|
|
else if (stripeWriteState == STRIPE_WRITE_ABORTED)
|
|
{
|
|
/*
|
|
* We only expect to see un-flushed stripes when checking against
|
|
* constraint violation. In that case, indexAM provides dirty
|
|
* snapshot to index_fetch_tuple callback.
|
|
*/
|
|
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
|
|
return false;
|
|
}
|
|
else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS)
|
|
{
|
|
if (stripeMetadata->insertedByCurrentXact)
|
|
{
|
|
/*
|
|
* Stripe write is in progress and its entry is inserted by current
|
|
* transaction, so obviously it must be written by me. Since caller
|
|
* might want to use tupleslot datums for some reason, do another
|
|
* look-up, but this time by first flushing our writes.
|
|
*
|
|
* XXX: For index scan, this is the only case that we flush pending
|
|
* writes of the current backend. If we have taught reader how to
|
|
* read from WriteStateMap. then we could guarantee that
|
|
* index_fetch_tuple would never flush pending writes, but this seem
|
|
* to be too much work for now, but should be doable.
|
|
*/
|
|
ColumnarReadFlushPendingWrites(scan->cs_readState);
|
|
|
|
/*
|
|
* Fill the tupleslot and fall through to return true, it
|
|
* certainly exists.
|
|
*/
|
|
ColumnarReadRowByRowNumberOrError(scan->cs_readState, rowNumber,
|
|
slot->tts_values, slot->tts_isnull);
|
|
}
|
|
else
|
|
{
|
|
/* similar to aborted writes, it should be dirty snapshot */
|
|
Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY);
|
|
|
|
/*
|
|
* Stripe that "might" contain the tuple with rowNumber is not
|
|
* flushed yet. Here we set all attributes of given tupleslot to NULL
|
|
* before returning true and expect the indexAM callback that called
|
|
* us --possibly to check against constraint violation-- blocks until
|
|
* writer transaction commits or aborts, without requiring us to fill
|
|
* the tupleslot properly.
|
|
*
|
|
* XXX: Note that the assumption we made above for the tupleslot
|
|
* holds for "unique" constraints defined on "btree" indexes.
|
|
*
|
|
* For the other constraints that we support, namely:
|
|
* * exclusion on btree,
|
|
* * exclusion on hash,
|
|
* * unique on btree;
|
|
* we still need to fill tts_values.
|
|
*
|
|
* However, for the same reason, we should have already flushed
|
|
* single tuple stripes when inserting into table for those three
|
|
* classes of constraints.
|
|
*
|
|
* This is annoying, but this also explains why this hack works for
|
|
* unique constraints on btree indexes, and also explains how we
|
|
* would never end up with "else" condition otherwise.
|
|
*/
|
|
memset(slot->tts_isnull, true, slot->tts_nvalid * sizeof(bool));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* At this point, we certainly know that stripe is flushed and
|
|
* ColumnarReadRowByRowNumber successfully filled the tupleslot.
|
|
*/
|
|
Assert(stripeWriteState == STRIPE_WRITE_FLUSHED);
|
|
}
|
|
|
|
slot->tts_tableOid = RelationGetRelid(columnarRelation);
|
|
slot->tts_tid = *tid;
|
|
ExecStoreVirtualTuple(slot);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_fetch_row_version(Relation relation,
|
|
ItemPointer tid,
|
|
Snapshot snapshot,
|
|
TupleTableSlot *slot)
|
|
{
|
|
elog(ERROR, "columnar_fetch_row_version not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_get_latest_tid(TableScanDesc sscan,
|
|
ItemPointer tid)
|
|
{
|
|
elog(ERROR, "columnar_get_latest_tid not implemented");
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_tuple_tid_valid(TableScanDesc scan, ItemPointer tid)
|
|
{
|
|
elog(ERROR, "columnar_tuple_tid_valid not implemented");
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
|
Snapshot snapshot)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
uint64 rowNumber = tid_to_row_number(slot->tts_tid);
|
|
StripeMetadata *stripeMetadata = FindStripeByRowNumber(rel, rowNumber, snapshot);
|
|
return stripeMetadata != NULL;
|
|
}
|
|
|
|
|
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
|
static TransactionId
|
|
columnar_index_delete_tuples(Relation rel,
|
|
TM_IndexDeleteOp *delstate)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
/*
|
|
* XXX: We didn't bother implementing index_delete_tuple for neither of
|
|
* simple deletion and bottom-up deletion cases. There is no particular
|
|
* reason for that, just to keep things simple.
|
|
*
|
|
* See the rest of this function to see how we deal with
|
|
* index_delete_tuples requests made to columnarAM.
|
|
*/
|
|
|
|
if (delstate->bottomup)
|
|
{
|
|
/*
|
|
* Ignore any bottom-up deletion requests.
|
|
*
|
|
* Currently only caller in postgres that does bottom-up deletion is
|
|
* _bt_bottomupdel_pass, which in turn calls _bt_delitems_delete_check.
|
|
* And this function is okay with ndeltids being set to 0 by tableAM
|
|
* for bottom-up deletion.
|
|
*/
|
|
delstate->ndeltids = 0;
|
|
return InvalidTransactionId;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* TableAM is not expected to set ndeltids to 0 for simple deletion
|
|
* case, so here we cannot do the same trick that we do for
|
|
* bottom-up deletion.
|
|
* See the assertion around table_index_delete_tuples call in pg
|
|
* function index_compute_xid_horizon_for_tuples.
|
|
*
|
|
* For this reason, to avoid receiving simple deletion requests for
|
|
* columnar tables (bottomup = false), columnar_index_fetch_tuple
|
|
* doesn't ever set all_dead to true in order to prevent triggering
|
|
* simple deletion of index tuples. But let's throw an error to be on
|
|
* the safe side.
|
|
*/
|
|
elog(ERROR, "columnar_index_delete_tuples not implemented for simple deletion");
|
|
}
|
|
}
|
|
|
|
|
|
#else
|
|
static TransactionId
|
|
columnar_compute_xid_horizon_for_tuples(Relation rel,
|
|
ItemPointerData *tids,
|
|
int nitems)
|
|
{
|
|
elog(ERROR, "columnar_compute_xid_horizon_for_tuples not implemented");
|
|
}
|
|
|
|
|
|
#endif
|
|
|
|
|
|
static void
|
|
columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
|
int options, BulkInsertState bistate)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
/*
|
|
* columnar_init_write_state allocates the write state in a longer
|
|
* lasting context, so no need to worry about it.
|
|
*/
|
|
ColumnarWriteState *writeState = columnar_init_write_state(relation,
|
|
RelationGetDescr(relation),
|
|
slot->tts_tableOid,
|
|
GetCurrentSubTransactionId());
|
|
|
|
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
|
|
writeState));
|
|
|
|
ColumnarCheckLogicalReplication(relation);
|
|
|
|
slot_getallattrs(slot);
|
|
|
|
Datum *values = detoast_values(slot->tts_tupleDescriptor,
|
|
slot->tts_values, slot->tts_isnull);
|
|
|
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
|
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
|
CommandId cid, int options,
|
|
BulkInsertState bistate, uint32 specToken)
|
|
{
|
|
elog(ERROR, "columnar_tuple_insert_speculative not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
|
uint32 specToken, bool succeeded)
|
|
{
|
|
elog(ERROR, "columnar_tuple_complete_speculative not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
|
CommandId cid, int options, BulkInsertState bistate)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
/*
|
|
* The callback to .multi_insert is table_multi_insert() and this is only used for the COPY
|
|
* command, so slot[i]->tts_tableoid will always be equal to relation->id. Thus, we can send
|
|
* RelationGetRelid(relation) as the tupSlotTableOid
|
|
*/
|
|
ColumnarWriteState *writeState = columnar_init_write_state(relation,
|
|
RelationGetDescr(relation),
|
|
RelationGetRelid(relation),
|
|
GetCurrentSubTransactionId());
|
|
|
|
ColumnarCheckLogicalReplication(relation);
|
|
|
|
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
|
|
writeState));
|
|
|
|
for (int i = 0; i < ntuples; i++)
|
|
{
|
|
TupleTableSlot *tupleSlot = slots[i];
|
|
|
|
slot_getallattrs(tupleSlot);
|
|
|
|
Datum *values = detoast_values(tupleSlot->tts_tupleDescriptor,
|
|
tupleSlot->tts_values, tupleSlot->tts_isnull);
|
|
|
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values,
|
|
tupleSlot->tts_isnull);
|
|
tupleSlot->tts_tid = row_number_to_tid(writtenRowNumber);
|
|
|
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
}
|
|
|
|
|
|
static TM_Result
|
|
columnar_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
|
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
|
TM_FailureData *tmfd, bool changingPart)
|
|
{
|
|
elog(ERROR, "columnar_tuple_delete not implemented");
|
|
}
|
|
|
|
|
|
static TM_Result
|
|
columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
|
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
|
bool wait, TM_FailureData *tmfd,
|
|
LockTupleMode *lockmode, bool *update_indexes)
|
|
{
|
|
elog(ERROR, "columnar_tuple_update not implemented");
|
|
}
|
|
|
|
|
|
static TM_Result
|
|
columnar_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
|
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
|
LockWaitPolicy wait_policy, uint8 flags,
|
|
TM_FailureData *tmfd)
|
|
{
|
|
elog(ERROR, "columnar_tuple_lock not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_finish_bulk_insert(Relation relation, int options)
|
|
{
|
|
/*
|
|
* Nothing to do here. We keep write states live until transaction end.
|
|
*/
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_relation_set_new_filenode(Relation rel,
|
|
const RelFileNode *newrnode,
|
|
char persistence,
|
|
TransactionId *freezeXid,
|
|
MultiXactId *minmulti)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
if (persistence == RELPERSISTENCE_UNLOGGED)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("unlogged columnar tables are not supported")));
|
|
}
|
|
|
|
/*
|
|
* If existing and new relfilenode are different, that means the existing
|
|
* storage was dropped and we also need to clean up the metadata and
|
|
* state. If they are equal, this is a new relation object and we don't
|
|
* need to clean anything.
|
|
*/
|
|
if (rel->rd_node.relNode != newrnode->relNode)
|
|
{
|
|
MarkRelfilenodeDropped(rel->rd_node.relNode, GetCurrentSubTransactionId());
|
|
|
|
DeleteMetadataRows(rel->rd_node);
|
|
}
|
|
|
|
*freezeXid = RecentXmin;
|
|
*minmulti = GetOldestMultiXactId();
|
|
SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true);
|
|
|
|
ColumnarStorageInit(srel, ColumnarMetadataNewStorageId());
|
|
InitColumnarOptions(rel->rd_id);
|
|
|
|
smgrclose(srel);
|
|
|
|
/* we will lazily initialize metadata in first stripe reservation */
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_relation_nontransactional_truncate(Relation rel)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
RelFileNode relfilenode = rel->rd_node;
|
|
|
|
NonTransactionDropWriteState(relfilenode.relNode);
|
|
|
|
/* Delete old relfilenode metadata */
|
|
DeleteMetadataRows(relfilenode);
|
|
|
|
/*
|
|
* No need to set new relfilenode, since the table was created in this
|
|
* transaction and no other transaction can see this relation yet. We
|
|
* can just truncate the relation.
|
|
*
|
|
* This is similar to what is done in heapam_relation_nontransactional_truncate.
|
|
*/
|
|
RelationTruncate(rel, 0);
|
|
|
|
uint64 storageId = ColumnarMetadataNewStorageId();
|
|
ColumnarStorageInit(RelationGetSmgr(rel), storageId);
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_relation_copy_data(Relation rel, const RelFileNode *newrnode)
|
|
{
|
|
elog(ERROR, "columnar_relation_copy_data not implemented");
|
|
}
|
|
|
|
|
|
/*
|
|
* columnar_relation_copy_for_cluster is called on VACUUM FULL, at which
|
|
* we should copy data from OldHeap to NewHeap.
|
|
*
|
|
* In general TableAM case this can also be called for the CLUSTER command
|
|
* which is not applicable for columnar since it doesn't support indexes.
|
|
*/
|
|
static void
|
|
columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|
Relation OldIndex, bool use_sort,
|
|
TransactionId OldestXmin,
|
|
TransactionId *xid_cutoff,
|
|
MultiXactId *multi_cutoff,
|
|
double *num_tuples,
|
|
double *tups_vacuumed,
|
|
double *tups_recently_dead)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
TupleDesc sourceDesc = RelationGetDescr(OldHeap);
|
|
TupleDesc targetDesc = RelationGetDescr(NewHeap);
|
|
|
|
if (OldIndex != NULL || use_sort)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("clustering columnar tables using indexes is "
|
|
"not supported")));
|
|
}
|
|
|
|
/*
|
|
* copy_table_data in cluster.c assumes tuple descriptors are exactly
|
|
* the same. Even dropped columns exist and are marked as attisdropped
|
|
* in the target relation.
|
|
*/
|
|
Assert(sourceDesc->natts == targetDesc->natts);
|
|
|
|
/* read settings from old heap, relfilenode will be swapped at the end */
|
|
ColumnarOptions columnarOptions = { 0 };
|
|
ReadColumnarOptions(OldHeap->rd_id, &columnarOptions);
|
|
|
|
ColumnarWriteState *writeState = ColumnarBeginWrite(NewHeap->rd_node,
|
|
columnarOptions,
|
|
targetDesc);
|
|
|
|
/* we need all columns */
|
|
int natts = OldHeap->rd_att->natts;
|
|
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);
|
|
|
|
/* no quals for table rewrite */
|
|
List *scanQual = NIL;
|
|
|
|
/* use SnapshotAny when re-writing table as heapAM does */
|
|
Snapshot snapshot = SnapshotAny;
|
|
|
|
MemoryContext scanContext = CreateColumnarScanMemoryContext();
|
|
bool randomAccess = false;
|
|
ColumnarReadState *readState = init_columnar_read_state(OldHeap, sourceDesc,
|
|
attr_needed, scanQual,
|
|
scanContext, snapshot,
|
|
randomAccess);
|
|
|
|
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
|
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
|
|
|
*num_tuples = 0;
|
|
|
|
/* we don't need to know rowNumber here */
|
|
while (ColumnarReadNextRow(readState, values, nulls, NULL))
|
|
{
|
|
ColumnarWriteRow(writeState, values, nulls);
|
|
(*num_tuples)++;
|
|
}
|
|
|
|
*tups_vacuumed = 0;
|
|
|
|
ColumnarEndWrite(writeState);
|
|
ColumnarEndRead(readState);
|
|
}
|
|
|
|
|
|
/*
|
|
* NeededColumnsList returns a list of AttrNumber's for the columns that
|
|
* are not dropped and specified by attr_needed.
|
|
*/
|
|
static List *
|
|
NeededColumnsList(TupleDesc tupdesc, Bitmapset *attr_needed)
|
|
{
|
|
List *columnList = NIL;
|
|
|
|
for (int i = 0; i < tupdesc->natts; i++)
|
|
{
|
|
if (tupdesc->attrs[i].attisdropped)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
/* attr_needed is 0-indexed but columnList is 1-indexed */
|
|
if (bms_is_member(i, attr_needed))
|
|
{
|
|
AttrNumber varattno = i + 1;
|
|
columnList = lappend_int(columnList, varattno);
|
|
}
|
|
}
|
|
|
|
return columnList;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarTableTupleCount returns the number of tuples that columnar
|
|
* table with relationId has by using stripe metadata.
|
|
*/
|
|
static uint64
|
|
ColumnarTableTupleCount(Relation relation)
|
|
{
|
|
List *stripeList = StripesForRelfilenode(relation->rd_node);
|
|
uint64 tupleCount = 0;
|
|
|
|
ListCell *lc = NULL;
|
|
foreach(lc, stripeList)
|
|
{
|
|
StripeMetadata *stripe = lfirst(lc);
|
|
tupleCount += stripe->rowCount;
|
|
}
|
|
|
|
return tupleCount;
|
|
}
|
|
|
|
|
|
/*
|
|
* columnar_vacuum_rel implements VACUUM without FULL option.
|
|
*/
|
|
static void
|
|
columnar_vacuum_rel(Relation rel, VacuumParams *params,
|
|
BufferAccessStrategy bstrategy)
|
|
{
|
|
if (!CheckCitusColumnarVersion(WARNING))
|
|
{
|
|
/*
|
|
* Skip if the extension catalogs are not up-to-date, but avoid
|
|
* erroring during auto-vacuum.
|
|
*/
|
|
return;
|
|
}
|
|
|
|
pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM,
|
|
RelationGetRelid(rel));
|
|
|
|
/*
|
|
* If metapage version of relation is older, then we hint users to VACUUM
|
|
* the relation in ColumnarMetapageCheckVersion. So if needed, upgrade
|
|
* the metapage before doing anything.
|
|
*/
|
|
bool isUpgrade = true;
|
|
ColumnarStorageUpdateIfNeeded(rel, isUpgrade);
|
|
|
|
int elevel = (params->options & VACOPT_VERBOSE) ? INFO : DEBUG2;
|
|
|
|
/* this should have been resolved by vacuum.c until now */
|
|
Assert(params->truncate != VACOPTVALUE_UNSPECIFIED);
|
|
|
|
LogRelationStats(rel, elevel);
|
|
|
|
/*
|
|
* We don't have updates, deletes, or concurrent updates, so all we
|
|
* care for now is truncating the unused space at the end of storage.
|
|
*/
|
|
if (params->truncate == VACOPTVALUE_ENABLED)
|
|
{
|
|
TruncateColumnar(rel, elevel);
|
|
}
|
|
|
|
BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
|
|
|
/* get the number of indexes */
|
|
List *indexList = RelationGetIndexList(rel);
|
|
int nindexes = list_length(indexList);
|
|
|
|
TransactionId oldestXmin;
|
|
TransactionId freezeLimit;
|
|
MultiXactId multiXactCutoff;
|
|
|
|
/* initialize xids */
|
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
|
MultiXactId oldestMxact;
|
|
vacuum_set_xid_limits(rel,
|
|
params->freeze_min_age,
|
|
params->freeze_table_age,
|
|
params->multixact_freeze_min_age,
|
|
params->multixact_freeze_table_age,
|
|
&oldestXmin, &oldestMxact,
|
|
&freezeLimit, &multiXactCutoff);
|
|
|
|
Assert(MultiXactIdPrecedesOrEquals(multiXactCutoff, oldestMxact));
|
|
#else
|
|
TransactionId xidFullScanLimit;
|
|
MultiXactId mxactFullScanLimit;
|
|
vacuum_set_xid_limits(rel,
|
|
params->freeze_min_age,
|
|
params->freeze_table_age,
|
|
params->multixact_freeze_min_age,
|
|
params->multixact_freeze_table_age,
|
|
&oldestXmin, &freezeLimit, &xidFullScanLimit,
|
|
&multiXactCutoff, &mxactFullScanLimit);
|
|
#endif
|
|
|
|
Assert(TransactionIdPrecedesOrEquals(freezeLimit, oldestXmin));
|
|
|
|
/*
|
|
* Columnar storage doesn't hold any transaction IDs, so we can always
|
|
* just advance to the most aggressive value.
|
|
*/
|
|
TransactionId newRelFrozenXid = oldestXmin;
|
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
|
MultiXactId newRelminMxid = oldestMxact;
|
|
#else
|
|
MultiXactId newRelminMxid = multiXactCutoff;
|
|
#endif
|
|
|
|
double new_live_tuples = ColumnarTableTupleCount(rel);
|
|
|
|
/* all visible pages are always 0 */
|
|
BlockNumber new_rel_allvisible = 0;
|
|
|
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
|
bool frozenxid_updated;
|
|
bool minmulti_updated;
|
|
|
|
vac_update_relstats(rel, new_rel_pages, new_live_tuples,
|
|
new_rel_allvisible, nindexes > 0,
|
|
newRelFrozenXid, newRelminMxid,
|
|
&frozenxid_updated, &minmulti_updated, false);
|
|
#else
|
|
vac_update_relstats(rel, new_rel_pages, new_live_tuples,
|
|
new_rel_allvisible, nindexes > 0,
|
|
newRelFrozenXid, newRelminMxid, false);
|
|
#endif
|
|
|
|
pgstat_report_vacuum(RelationGetRelid(rel),
|
|
rel->rd_rel->relisshared,
|
|
Max(new_live_tuples, 0),
|
|
0);
|
|
pgstat_progress_end_command();
|
|
}
|
|
|
|
|
|
/*
|
|
* LogRelationStats logs statistics as the output of the VACUUM VERBOSE.
|
|
*/
|
|
static void
|
|
LogRelationStats(Relation rel, int elevel)
|
|
{
|
|
ListCell *stripeMetadataCell = NULL;
|
|
RelFileNode relfilenode = rel->rd_node;
|
|
StringInfo infoBuf = makeStringInfo();
|
|
|
|
int compressionStats[COMPRESSION_COUNT] = { 0 };
|
|
uint64 totalStripeLength = 0;
|
|
uint64 tupleCount = 0;
|
|
uint64 chunkCount = 0;
|
|
TupleDesc tupdesc = RelationGetDescr(rel);
|
|
uint64 droppedChunksWithData = 0;
|
|
uint64 totalDecompressedLength = 0;
|
|
|
|
List *stripeList = StripesForRelfilenode(relfilenode);
|
|
int stripeCount = list_length(stripeList);
|
|
|
|
foreach(stripeMetadataCell, stripeList)
|
|
{
|
|
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
|
StripeSkipList *skiplist = ReadStripeSkipList(relfilenode, stripe->id,
|
|
RelationGetDescr(rel),
|
|
stripe->chunkCount,
|
|
GetTransactionSnapshot());
|
|
for (uint32 column = 0; column < skiplist->columnCount; column++)
|
|
{
|
|
bool attrDropped = tupdesc->attrs[column].attisdropped;
|
|
for (uint32 chunk = 0; chunk < skiplist->chunkCount; chunk++)
|
|
{
|
|
ColumnChunkSkipNode *skipnode =
|
|
&skiplist->chunkSkipNodeArray[column][chunk];
|
|
|
|
/* ignore zero length chunks for dropped attributes */
|
|
if (skipnode->valueLength > 0)
|
|
{
|
|
compressionStats[skipnode->valueCompressionType]++;
|
|
chunkCount++;
|
|
|
|
if (attrDropped)
|
|
{
|
|
droppedChunksWithData++;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We don't compress exists buffer, so its compressed & decompressed
|
|
* lengths are the same.
|
|
*/
|
|
totalDecompressedLength += skipnode->existsLength;
|
|
totalDecompressedLength += skipnode->decompressedValueSize;
|
|
}
|
|
}
|
|
|
|
tupleCount += stripe->rowCount;
|
|
totalStripeLength += stripe->dataLength;
|
|
}
|
|
|
|
uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
|
RelationCloseSmgr(rel);
|
|
|
|
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
|
ObjectIdGetDatum(RelationGetRelid(rel)));
|
|
|
|
double compressionRate = totalStripeLength ?
|
|
(double) totalDecompressedLength / totalStripeLength :
|
|
1.0;
|
|
|
|
appendStringInfo(infoBuf, "storage id: %ld\n", DatumGetInt64(storageId));
|
|
appendStringInfo(infoBuf, "total file size: %ld, total data size: %ld\n",
|
|
relPages * BLCKSZ, totalStripeLength);
|
|
appendStringInfo(infoBuf, "compression rate: %.2fx\n", compressionRate);
|
|
appendStringInfo(infoBuf,
|
|
"total row count: %ld, stripe count: %d, "
|
|
"average rows per stripe: %ld\n",
|
|
tupleCount, stripeCount,
|
|
stripeCount ? tupleCount / stripeCount : 0);
|
|
appendStringInfo(infoBuf,
|
|
"chunk count: %ld"
|
|
", containing data for dropped columns: %ld",
|
|
chunkCount, droppedChunksWithData);
|
|
for (int compressionType = 0; compressionType < COMPRESSION_COUNT; compressionType++)
|
|
{
|
|
const char *compressionName = CompressionTypeStr(compressionType);
|
|
|
|
/* skip if this compression algorithm has not been compiled */
|
|
if (compressionName == NULL)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
/* skip if no chunks use this compression type */
|
|
if (compressionStats[compressionType] == 0)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
appendStringInfo(infoBuf,
|
|
", %s compressed: %d",
|
|
compressionName,
|
|
compressionStats[compressionType]);
|
|
}
|
|
appendStringInfoString(infoBuf, "\n");
|
|
|
|
ereport(elevel, (errmsg("statistics for \"%s\":\n%s", RelationGetRelationName(rel),
|
|
infoBuf->data)));
|
|
}
|
|
|
|
|
|
/*
|
|
* TruncateColumnar truncates the unused space at the end of main fork for
|
|
* a columnar table. This unused space can be created by aborted transactions.
|
|
*
|
|
* This implementation is based on heap_vacuum_rel in vacuumlazy.c with some
|
|
* changes so it suits columnar store relations.
|
|
*/
|
|
static void
|
|
TruncateColumnar(Relation rel, int elevel)
|
|
{
|
|
PGRUsage ru0;
|
|
|
|
pg_rusage_init(&ru0);
|
|
|
|
/* Report that we are now truncating */
|
|
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
|
|
PROGRESS_VACUUM_PHASE_TRUNCATE);
|
|
|
|
|
|
/*
|
|
* We need access exclusive lock on the relation in order to do
|
|
* truncation. If we can't get it, give up rather than waiting --- we
|
|
* don't want to block other backends, and we don't want to deadlock
|
|
* (which is quite possible considering we already hold a lower-grade
|
|
* lock).
|
|
*
|
|
* The decisions for AccessExclusiveLock and conditional lock with
|
|
* a timeout is based on lazy_truncate_heap in vacuumlazy.c.
|
|
*/
|
|
if (!ConditionalLockRelationWithTimeout(rel, AccessExclusiveLock,
|
|
VACUUM_TRUNCATE_LOCK_TIMEOUT,
|
|
VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
|
|
{
|
|
/*
|
|
* We failed to establish the lock in the specified number of
|
|
* retries. This means we give up truncating.
|
|
*/
|
|
ereport(elevel,
|
|
(errmsg("\"%s\": stopping truncate due to conflicting lock request",
|
|
RelationGetRelationName(rel))));
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Due to the AccessExclusive lock there's no danger that
|
|
* new stripes be added beyond highestPhysicalAddress while
|
|
* we're truncating.
|
|
*/
|
|
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1,
|
|
ColumnarFirstLogicalOffset);
|
|
|
|
BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
|
|
|
if (!ColumnarStorageTruncate(rel, newDataReservation))
|
|
{
|
|
UnlockRelation(rel, AccessExclusiveLock);
|
|
return;
|
|
}
|
|
|
|
BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
|
|
|
/*
|
|
* We can release the exclusive lock as soon as we have truncated.
|
|
* Other backends can't safely access the relation until they have
|
|
* processed the smgr invalidation that smgrtruncate sent out ... but
|
|
* that should happen as part of standard invalidation processing once
|
|
* they acquire lock on the relation.
|
|
*/
|
|
UnlockRelation(rel, AccessExclusiveLock);
|
|
|
|
ereport(elevel,
|
|
(errmsg("\"%s\": truncated %u to %u pages",
|
|
RelationGetRelationName(rel),
|
|
old_rel_pages, new_rel_pages),
|
|
errdetail_internal("%s", pg_rusage_show(&ru0))));
|
|
}
|
|
|
|
|
|
/*
|
|
* ConditionalLockRelationWithTimeout tries to acquire a relation lock until
|
|
* it either succeeds or timesout. It doesn't enter wait queue and instead it
|
|
* sleeps between lock tries.
|
|
*
|
|
* This is based on the lock loop in lazy_truncate_heap().
|
|
*/
|
|
static bool
|
|
ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout,
|
|
int retryInterval)
|
|
{
|
|
int lock_retry = 0;
|
|
|
|
while (true)
|
|
{
|
|
if (ConditionalLockRelation(rel, lockMode))
|
|
{
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* Check for interrupts while trying to (re-)acquire the lock
|
|
*/
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
if (++lock_retry > (timeout / retryInterval))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
pg_usleep(retryInterval * 1000L);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
|
BufferAccessStrategy bstrategy)
|
|
{
|
|
/*
|
|
* Our access method is not pages based, i.e. tuples are not confined
|
|
* to pages boundaries. So not much to do here. We return true anyway
|
|
* so acquire_sample_rows() in analyze.c would call our
|
|
* columnar_scan_analyze_next_tuple() callback.
|
|
*/
|
|
return true;
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
|
double *liverows, double *deadrows,
|
|
TupleTableSlot *slot)
|
|
{
|
|
/*
|
|
* Currently we don't do anything smart to reduce number of rows returned
|
|
* for ANALYZE. The TableAM API's ANALYZE functions are designed for page
|
|
* based access methods where it chooses random pages, and then reads
|
|
* tuples from those pages.
|
|
*
|
|
* We could do something like that here by choosing sample stripes or chunks,
|
|
* but getting that correct might need quite some work. Since columnar_fdw's
|
|
* ANALYZE scanned all rows, as a starter we do the same here and scan all
|
|
* rows.
|
|
*/
|
|
if (columnar_getnextslot(scan, ForwardScanDirection, slot))
|
|
{
|
|
(*liverows)++;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
static double
|
|
columnar_index_build_range_scan(Relation columnarRelation,
|
|
Relation indexRelation,
|
|
IndexInfo *indexInfo,
|
|
bool allow_sync,
|
|
bool anyvisible,
|
|
bool progress,
|
|
BlockNumber start_blockno,
|
|
BlockNumber numblocks,
|
|
IndexBuildCallback callback,
|
|
void *callback_state,
|
|
TableScanDesc scan)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
if (start_blockno != 0 || numblocks != InvalidBlockNumber)
|
|
{
|
|
/*
|
|
* Columnar utility hook already errors out for BRIN indexes on columnar
|
|
* tables, but be on the safe side.
|
|
*/
|
|
ereport(ERROR, (errmsg("BRIN indexes on columnar tables are not supported")));
|
|
}
|
|
|
|
if (scan)
|
|
{
|
|
/*
|
|
* Parallel scans on columnar tables are already discardad by
|
|
* ColumnarGetRelationInfoHook but be on the safe side.
|
|
*/
|
|
elog(ERROR, "parallel scans on columnar are not supported");
|
|
}
|
|
|
|
/*
|
|
* In a normal index build, we use SnapshotAny to retrieve all tuples. In
|
|
* a concurrent build or during bootstrap, we take a regular MVCC snapshot
|
|
* and index whatever's live according to that.
|
|
*/
|
|
TransactionId OldestXmin = InvalidTransactionId;
|
|
if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
|
|
{
|
|
/* ignore lazy VACUUM's */
|
|
OldestXmin = GetOldestNonRemovableTransactionId_compat(columnarRelation,
|
|
PROCARRAY_FLAGS_VACUUM);
|
|
}
|
|
|
|
Snapshot snapshot = { 0 };
|
|
bool snapshotRegisteredByUs = false;
|
|
|
|
/*
|
|
* For serial index build, we begin our own scan. We may also need to
|
|
* register a snapshot whose lifetime is under our direct control.
|
|
*/
|
|
if (!TransactionIdIsValid(OldestXmin))
|
|
{
|
|
snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
|
snapshotRegisteredByUs = true;
|
|
}
|
|
else
|
|
{
|
|
snapshot = SnapshotAny;
|
|
}
|
|
|
|
int nkeys = 0;
|
|
ScanKeyData *scanKey = NULL;
|
|
bool allowAccessStrategy = true;
|
|
scan = table_beginscan_strat(columnarRelation, snapshot, nkeys, scanKey,
|
|
allowAccessStrategy, allow_sync);
|
|
|
|
if (progress)
|
|
{
|
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
|
PROGRESS_SCAN_BLOCKS_TOTAL);
|
|
}
|
|
|
|
/*
|
|
* Set up execution state for predicate, if any.
|
|
* Note that this is only useful for partial indexes.
|
|
*/
|
|
EState *estate = CreateExecutorState();
|
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
|
econtext->ecxt_scantuple = table_slot_create(columnarRelation, NULL);
|
|
ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
|
|
|
|
double reltuples = ColumnarReadRowsIntoIndex(scan, indexRelation, indexInfo,
|
|
progress, callback, callback_state,
|
|
estate, predicate);
|
|
table_endscan(scan);
|
|
|
|
if (progress)
|
|
{
|
|
/* report the last "virtual" block as "done" */
|
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
|
PROGRESS_SCAN_BLOCKS_DONE);
|
|
}
|
|
|
|
if (snapshotRegisteredByUs)
|
|
{
|
|
UnregisterSnapshot(snapshot);
|
|
}
|
|
|
|
ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
|
|
FreeExecutorState(estate);
|
|
indexInfo->ii_ExpressionsState = NIL;
|
|
indexInfo->ii_PredicateState = NULL;
|
|
|
|
return reltuples;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarReportTotalVirtualBlocks reports progress for index build based on
|
|
* number of "virtual" blocks that given relation has.
|
|
* "progressArrIndex" argument determines which entry in st_progress_param
|
|
* array should be updated. In this case, we only expect PROGRESS_SCAN_BLOCKS_TOTAL
|
|
* or PROGRESS_SCAN_BLOCKS_DONE to specify whether we want to report calculated
|
|
* number of blocks as "done" or as "total" number of "virtual" blocks to scan.
|
|
*/
|
|
static void
|
|
ColumnarReportTotalVirtualBlocks(Relation relation, Snapshot snapshot,
|
|
int progressArrIndex)
|
|
{
|
|
/*
|
|
* Indeed, columnar tables might have gaps between row numbers, e.g
|
|
* due to aborted transactions etc. Also, ItemPointer BlockNumber's
|
|
* for columnar tables don't actually correspond to actual disk blocks
|
|
* as in heapAM. For this reason, we call them as "virtual" blocks. At
|
|
* the moment, we believe it is better to report our progress based on
|
|
* this "virtual" block concept instead of doing nothing.
|
|
*/
|
|
Assert(progressArrIndex == PROGRESS_SCAN_BLOCKS_TOTAL ||
|
|
progressArrIndex == PROGRESS_SCAN_BLOCKS_DONE);
|
|
BlockNumber nvirtualBlocks =
|
|
ColumnarGetNumberOfVirtualBlocks(relation, snapshot);
|
|
pgstat_progress_update_param(progressArrIndex, nvirtualBlocks);
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarGetNumberOfVirtualBlocks returns total number of "virtual" blocks
|
|
* that given columnar table has based on based on ItemPointer BlockNumber's.
|
|
*/
|
|
static BlockNumber
|
|
ColumnarGetNumberOfVirtualBlocks(Relation relation, Snapshot snapshot)
|
|
{
|
|
ItemPointerData highestItemPointer =
|
|
ColumnarGetHighestItemPointer(relation, snapshot);
|
|
if (!ItemPointerIsValid(&highestItemPointer))
|
|
{
|
|
/* table is empty according to our snapshot */
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Since BlockNumber is 0-based, increment it by 1 to find the total
|
|
* number of "virtual" blocks.
|
|
*/
|
|
return ItemPointerGetBlockNumber(&highestItemPointer) + 1;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarGetHighestItemPointer returns ItemPointerData for the tuple with
|
|
* highest tid for given relation.
|
|
* If given relation is empty, then returns invalid item pointer.
|
|
*/
|
|
static ItemPointerData
|
|
ColumnarGetHighestItemPointer(Relation relation, Snapshot snapshot)
|
|
{
|
|
StripeMetadata *stripeWithHighestRowNumber =
|
|
FindStripeWithHighestRowNumber(relation, snapshot);
|
|
if (stripeWithHighestRowNumber == NULL ||
|
|
StripeGetHighestRowNumber(stripeWithHighestRowNumber) == 0)
|
|
{
|
|
/* table is empty according to our snapshot */
|
|
ItemPointerData invalidItemPtr;
|
|
ItemPointerSetInvalid(&invalidItemPtr);
|
|
return invalidItemPtr;
|
|
}
|
|
|
|
uint64 highestRowNumber = StripeGetHighestRowNumber(stripeWithHighestRowNumber);
|
|
return row_number_to_tid(highestRowNumber);
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarReadRowsIntoIndex builds indexRelation tuples by reading the
|
|
* actual relation based on given "scan" and returns number of tuples
|
|
* scanned to build the indexRelation.
|
|
*/
|
|
static double
|
|
ColumnarReadRowsIntoIndex(TableScanDesc scan, Relation indexRelation,
|
|
IndexInfo *indexInfo, bool progress,
|
|
IndexBuildCallback indexCallback,
|
|
void *indexCallbackState, EState *estate,
|
|
ExprState *predicate)
|
|
{
|
|
double reltuples = 0;
|
|
|
|
BlockNumber lastReportedBlockNumber = InvalidBlockNumber;
|
|
|
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
|
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
|
while (columnar_getnextslot(scan, ForwardScanDirection, slot))
|
|
{
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
BlockNumber currentBlockNumber = ItemPointerGetBlockNumber(&slot->tts_tid);
|
|
if (progress && lastReportedBlockNumber != currentBlockNumber)
|
|
{
|
|
/*
|
|
* columnar_getnextslot guarantees that returned tuple will
|
|
* always have a greater ItemPointer than the ones we fetched
|
|
* before, so we directly use BlockNumber to report our progress.
|
|
*/
|
|
Assert(lastReportedBlockNumber == InvalidBlockNumber ||
|
|
currentBlockNumber >= lastReportedBlockNumber);
|
|
pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
|
|
currentBlockNumber);
|
|
lastReportedBlockNumber = currentBlockNumber;
|
|
}
|
|
|
|
MemoryContextReset(econtext->ecxt_per_tuple_memory);
|
|
|
|
if (predicate != NULL && !ExecQual(predicate, econtext))
|
|
{
|
|
/* for partial indexes, discard tuples that don't satisfy the predicate */
|
|
continue;
|
|
}
|
|
|
|
Datum indexValues[INDEX_MAX_KEYS];
|
|
bool indexNulls[INDEX_MAX_KEYS];
|
|
FormIndexDatum(indexInfo, slot, estate, indexValues, indexNulls);
|
|
|
|
ItemPointerData itemPointerData = slot->tts_tid;
|
|
|
|
/* currently, columnar tables can't have dead tuples */
|
|
bool tupleIsAlive = true;
|
|
indexCallback(indexRelation, &itemPointerData, indexValues, indexNulls,
|
|
tupleIsAlive, indexCallbackState);
|
|
|
|
reltuples++;
|
|
}
|
|
|
|
return reltuples;
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_index_validate_scan(Relation columnarRelation,
|
|
Relation indexRelation,
|
|
IndexInfo *indexInfo,
|
|
Snapshot snapshot,
|
|
ValidateIndexState *
|
|
validateIndexState)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
|
PROGRESS_SCAN_BLOCKS_TOTAL);
|
|
|
|
/*
|
|
* Set up execution state for predicate, if any.
|
|
* Note that this is only useful for partial indexes.
|
|
*/
|
|
EState *estate = CreateExecutorState();
|
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
|
econtext->ecxt_scantuple = table_slot_create(columnarRelation, NULL);
|
|
ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
|
|
|
|
int nkeys = 0;
|
|
ScanKeyData *scanKey = NULL;
|
|
bool allowAccessStrategy = true;
|
|
bool allowSyncScan = false;
|
|
TableScanDesc scan = table_beginscan_strat(columnarRelation, snapshot, nkeys, scanKey,
|
|
allowAccessStrategy, allowSyncScan);
|
|
|
|
ColumnarReadMissingRowsIntoIndex(scan, indexRelation, indexInfo, estate,
|
|
predicate, validateIndexState);
|
|
|
|
table_endscan(scan);
|
|
|
|
/* report the last "virtual" block as "done" */
|
|
ColumnarReportTotalVirtualBlocks(columnarRelation, snapshot,
|
|
PROGRESS_SCAN_BLOCKS_DONE);
|
|
|
|
ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
|
|
FreeExecutorState(estate);
|
|
indexInfo->ii_ExpressionsState = NIL;
|
|
indexInfo->ii_PredicateState = NULL;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarReadMissingRowsIntoIndex inserts the tuples that are not in
|
|
* the index yet by reading the actual relation based on given "scan".
|
|
*/
|
|
static void
|
|
ColumnarReadMissingRowsIntoIndex(TableScanDesc scan, Relation indexRelation,
|
|
IndexInfo *indexInfo, EState *estate,
|
|
ExprState *predicate,
|
|
ValidateIndexState *validateIndexState)
|
|
{
|
|
BlockNumber lastReportedBlockNumber = InvalidBlockNumber;
|
|
|
|
bool indexTupleSortEmpty = false;
|
|
ItemPointerData indexedItemPointerData;
|
|
ItemPointerSetInvalid(&indexedItemPointerData);
|
|
|
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
|
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
|
while (columnar_getnextslot(scan, ForwardScanDirection, slot))
|
|
{
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
ItemPointer columnarItemPointer = &slot->tts_tid;
|
|
BlockNumber currentBlockNumber = ItemPointerGetBlockNumber(columnarItemPointer);
|
|
if (lastReportedBlockNumber != currentBlockNumber)
|
|
{
|
|
/*
|
|
* columnar_getnextslot guarantees that returned tuple will
|
|
* always have a greater ItemPointer than the ones we fetched
|
|
* before, so we directly use BlockNumber to report our progress.
|
|
*/
|
|
Assert(lastReportedBlockNumber == InvalidBlockNumber ||
|
|
currentBlockNumber >= lastReportedBlockNumber);
|
|
pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
|
|
currentBlockNumber);
|
|
lastReportedBlockNumber = currentBlockNumber;
|
|
}
|
|
|
|
validateIndexState->htups += 1;
|
|
|
|
if (!indexTupleSortEmpty &&
|
|
(!ItemPointerIsValid(&indexedItemPointerData) ||
|
|
ItemPointerCompare(&indexedItemPointerData, columnarItemPointer) < 0))
|
|
{
|
|
/*
|
|
* Skip indexed item pointers until we find or pass the current
|
|
* columnar relation item pointer.
|
|
*/
|
|
indexedItemPointerData =
|
|
TupleSortSkipSmallerItemPointers(validateIndexState->tuplesort,
|
|
columnarItemPointer);
|
|
indexTupleSortEmpty = !ItemPointerIsValid(&indexedItemPointerData);
|
|
}
|
|
|
|
if (!indexTupleSortEmpty &&
|
|
ItemPointerCompare(&indexedItemPointerData, columnarItemPointer) == 0)
|
|
{
|
|
/* tuple is already covered by the index, skip */
|
|
continue;
|
|
}
|
|
Assert(indexTupleSortEmpty ||
|
|
ItemPointerCompare(&indexedItemPointerData, columnarItemPointer) > 0);
|
|
|
|
MemoryContextReset(econtext->ecxt_per_tuple_memory);
|
|
|
|
if (predicate != NULL && !ExecQual(predicate, econtext))
|
|
{
|
|
/* for partial indexes, discard tuples that don't satisfy the predicate */
|
|
continue;
|
|
}
|
|
|
|
Datum indexValues[INDEX_MAX_KEYS];
|
|
bool indexNulls[INDEX_MAX_KEYS];
|
|
FormIndexDatum(indexInfo, slot, estate, indexValues, indexNulls);
|
|
|
|
Relation columnarRelation = scan->rs_rd;
|
|
IndexUniqueCheck indexUniqueCheck =
|
|
indexInfo->ii_Unique ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO;
|
|
index_insert_compat(indexRelation, indexValues, indexNulls, columnarItemPointer,
|
|
columnarRelation, indexUniqueCheck, false, indexInfo);
|
|
|
|
validateIndexState->tups_inserted += 1;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* TupleSortSkipSmallerItemPointers iterates given tupleSort until finding an
|
|
* ItemPointer that is greater than or equal to given targetItemPointer and
|
|
* returns that ItemPointer.
|
|
* If such an ItemPointer does not exist, then returns invalid ItemPointer.
|
|
*
|
|
* Note that this function assumes given tupleSort doesn't have any NULL
|
|
* Datum's.
|
|
*/
|
|
static ItemPointerData
|
|
TupleSortSkipSmallerItemPointers(Tuplesortstate *tupleSort, ItemPointer targetItemPointer)
|
|
{
|
|
ItemPointerData tsItemPointerData;
|
|
ItemPointerSetInvalid(&tsItemPointerData);
|
|
|
|
while (!ItemPointerIsValid(&tsItemPointerData) ||
|
|
ItemPointerCompare(&tsItemPointerData, targetItemPointer) < 0)
|
|
{
|
|
bool forwardDirection = true;
|
|
Datum *abbrev = NULL;
|
|
Datum tsDatum;
|
|
bool tsDatumIsNull;
|
|
if (!tuplesort_getdatum(tupleSort, forwardDirection, &tsDatum,
|
|
&tsDatumIsNull, abbrev))
|
|
{
|
|
ItemPointerSetInvalid(&tsItemPointerData);
|
|
break;
|
|
}
|
|
|
|
Assert(!tsDatumIsNull);
|
|
itemptr_decode(&tsItemPointerData, DatumGetInt64(tsDatum));
|
|
|
|
#ifndef USE_FLOAT8_BYVAL
|
|
|
|
/*
|
|
* If int8 is pass-by-ref, we need to free Datum memory.
|
|
* See tuplesort_getdatum function's comment.
|
|
*/
|
|
pfree(DatumGetPointer(tsDatum));
|
|
#endif
|
|
}
|
|
|
|
return tsItemPointerData;
|
|
}
|
|
|
|
|
|
static uint64
|
|
columnar_relation_size(Relation rel, ForkNumber forkNumber)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
uint64 nblocks = 0;
|
|
|
|
/* InvalidForkNumber indicates returning the size for all forks */
|
|
if (forkNumber == InvalidForkNumber)
|
|
{
|
|
for (int i = 0; i < MAX_FORKNUM; i++)
|
|
{
|
|
nblocks += smgrnblocks(RelationGetSmgr(rel), i);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber);
|
|
}
|
|
|
|
return nblocks * BLCKSZ;
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_relation_needs_toast_table(Relation rel)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
static void
|
|
columnar_estimate_rel_size(Relation rel, int32 *attr_widths,
|
|
BlockNumber *pages, double *tuples,
|
|
double *allvisfrac)
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
*pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
|
*tuples = ColumnarTableRowCount(rel);
|
|
|
|
/*
|
|
* Append-only, so everything is visible except in-progress or rolled-back
|
|
* transactions.
|
|
*/
|
|
*allvisfrac = 1.0;
|
|
|
|
get_rel_data_width(rel, attr_widths);
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate)
|
|
{
|
|
elog(ERROR, "columnar_scan_sample_next_block not implemented");
|
|
}
|
|
|
|
|
|
static bool
|
|
columnar_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
|
|
TupleTableSlot *slot)
|
|
{
|
|
elog(ERROR, "columnar_scan_sample_next_tuple not implemented");
|
|
}
|
|
|
|
|
|
static void
|
|
ColumnarXactCallback(XactEvent event, void *arg)
|
|
{
|
|
switch (event)
|
|
{
|
|
case XACT_EVENT_COMMIT:
|
|
case XACT_EVENT_PARALLEL_COMMIT:
|
|
case XACT_EVENT_PREPARE:
|
|
{
|
|
/* nothing to do */
|
|
break;
|
|
}
|
|
|
|
case XACT_EVENT_ABORT:
|
|
case XACT_EVENT_PARALLEL_ABORT:
|
|
{
|
|
DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
|
|
break;
|
|
}
|
|
|
|
case XACT_EVENT_PRE_COMMIT:
|
|
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
|
case XACT_EVENT_PRE_PREPARE:
|
|
{
|
|
FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
ColumnarSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
|
|
SubTransactionId parentSubid, void *arg)
|
|
{
|
|
switch (event)
|
|
{
|
|
case SUBXACT_EVENT_START_SUB:
|
|
case SUBXACT_EVENT_COMMIT_SUB:
|
|
{
|
|
/* nothing to do */
|
|
break;
|
|
}
|
|
|
|
case SUBXACT_EVENT_ABORT_SUB:
|
|
{
|
|
DiscardWriteStateForAllRels(mySubid, parentSubid);
|
|
break;
|
|
}
|
|
|
|
case SUBXACT_EVENT_PRE_COMMIT_SUB:
|
|
{
|
|
FlushWriteStateForAllRels(mySubid, parentSubid);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
columnar_tableam_init()
|
|
{
|
|
RegisterXactCallback(ColumnarXactCallback, NULL);
|
|
RegisterSubXactCallback(ColumnarSubXactCallback, NULL);
|
|
|
|
PrevObjectAccessHook = object_access_hook;
|
|
object_access_hook = ColumnarTableAMObjectAccessHook;
|
|
|
|
PrevProcessUtilityHook = ProcessUtility_hook ?
|
|
ProcessUtility_hook :
|
|
standard_ProcessUtility;
|
|
ProcessUtility_hook = ColumnarProcessUtility;
|
|
|
|
columnar_customscan_init();
|
|
|
|
TTSOpsColumnar = TTSOpsVirtual;
|
|
TTSOpsColumnar.copy_heap_tuple = ColumnarSlotCopyHeapTuple;
|
|
DefineCustomBoolVariable(
|
|
"columnar.enable_version_checks",
|
|
gettext_noop("Enables Version Check for Columnar"),
|
|
NULL,
|
|
&EnableVersionChecksColumnar,
|
|
true,
|
|
PGC_USERSET,
|
|
GUC_NO_SHOW_ALL,
|
|
NULL, NULL, NULL);
|
|
}
|
|
|
|
|
|
/*
|
|
* Get the number of chunks filtered out during the given scan.
|
|
*/
|
|
int64
|
|
ColumnarScanChunkGroupsFiltered(ColumnarScanDesc columnarScanDesc)
|
|
{
|
|
ColumnarReadState *readState = columnarScanDesc->cs_readState;
|
|
|
|
/* readState is initialized lazily */
|
|
if (readState != NULL)
|
|
{
|
|
return ColumnarReadChunkGroupsFiltered(readState);
|
|
}
|
|
else
|
|
{
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Implementation of TupleTableSlotOps.copy_heap_tuple for TTSOpsColumnar.
|
|
*/
|
|
static HeapTuple
|
|
ColumnarSlotCopyHeapTuple(TupleTableSlot *slot)
|
|
{
|
|
Assert(!TTS_EMPTY(slot));
|
|
|
|
HeapTuple tuple = heap_form_tuple(slot->tts_tupleDescriptor,
|
|
slot->tts_values,
|
|
slot->tts_isnull);
|
|
|
|
/* slot->tts_tid is filled in columnar_getnextslot */
|
|
tuple->t_self = slot->tts_tid;
|
|
|
|
return tuple;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarTableDropHook
|
|
*
|
|
* Clean-up resources for columnar tables.
|
|
*/
|
|
static void
|
|
ColumnarTableDropHook(Oid relid)
|
|
{
|
|
/*
|
|
* Lock relation to prevent it from being dropped and to avoid
|
|
* race conditions in the next if block.
|
|
*/
|
|
LockRelationOid(relid, AccessShareLock);
|
|
|
|
if (IsColumnarTableAmTable(relid))
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
|
|
/*
|
|
* Drop metadata. No need to drop storage here since for
|
|
* tableam tables storage is managed by postgres.
|
|
*/
|
|
Relation rel = table_open(relid, AccessExclusiveLock);
|
|
RelFileNode relfilenode = rel->rd_node;
|
|
|
|
DeleteMetadataRows(relfilenode);
|
|
DeleteColumnarTableOptions(rel->rd_id, true);
|
|
|
|
MarkRelfilenodeDropped(relfilenode.relNode, GetCurrentSubTransactionId());
|
|
|
|
/* keep the lock since we did physical changes to the relation */
|
|
table_close(rel, NoLock);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Reject AFTER ... FOR EACH ROW triggers on columnar tables.
|
|
*/
|
|
static void
|
|
ColumnarTriggerCreateHook(Oid tgid)
|
|
{
|
|
/*
|
|
* Fetch the pg_trigger tuple by the Oid of the trigger
|
|
*/
|
|
ScanKeyData skey[1];
|
|
Relation tgrel = table_open(TriggerRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&skey[0],
|
|
Anum_pg_trigger_oid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(tgid));
|
|
|
|
SysScanDesc tgscan = systable_beginscan(tgrel, TriggerOidIndexId, true,
|
|
SnapshotSelf, 1, skey);
|
|
|
|
HeapTuple tgtup = systable_getnext(tgscan);
|
|
|
|
if (!HeapTupleIsValid(tgtup))
|
|
{
|
|
table_close(tgrel, AccessShareLock);
|
|
return;
|
|
}
|
|
|
|
Form_pg_trigger tgrec = (Form_pg_trigger) GETSTRUCT(tgtup);
|
|
|
|
Oid tgrelid = tgrec->tgrelid;
|
|
int16 tgtype = tgrec->tgtype;
|
|
|
|
systable_endscan(tgscan);
|
|
table_close(tgrel, AccessShareLock);
|
|
|
|
if (TRIGGER_FOR_ROW(tgtype) && TRIGGER_FOR_AFTER(tgtype) &&
|
|
IsColumnarTableAmTable(tgrelid))
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg(
|
|
"Foreign keys and AFTER ROW triggers are not supported for columnar tables"),
|
|
errhint("Consider an AFTER STATEMENT trigger instead.")));
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Capture create/drop events and dispatch to the proper action.
|
|
*/
|
|
static void
|
|
ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
|
|
int subId, void *arg)
|
|
{
|
|
if (PrevObjectAccessHook)
|
|
{
|
|
PrevObjectAccessHook(access, classId, objectId, subId, arg);
|
|
}
|
|
|
|
/* dispatch to the proper action */
|
|
if (access == OAT_DROP && classId == RelationRelationId && !OidIsValid(subId))
|
|
{
|
|
ColumnarTableDropHook(objectId);
|
|
}
|
|
else if (access == OAT_POST_CREATE && classId == TriggerRelationId)
|
|
{
|
|
ColumnarTriggerCreateHook(objectId);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarProcessAlterTable - if modifying a columnar table, extract columnar
|
|
* options and return the table's RangeVar.
|
|
*/
|
|
static RangeVar *
|
|
ColumnarProcessAlterTable(AlterTableStmt *alterTableStmt, List **columnarOptions)
|
|
{
|
|
RangeVar *columnarRangeVar = NULL;
|
|
Relation rel = relation_openrv_extended(alterTableStmt->relation, AccessShareLock,
|
|
alterTableStmt->missing_ok);
|
|
|
|
if (rel == NULL)
|
|
{
|
|
return NULL;
|
|
}
|
|
|
|
/* track separately in case of ALTER TABLE ... SET ACCESS METHOD */
|
|
bool srcIsColumnar = rel->rd_tableam == GetColumnarTableAmRoutine();
|
|
bool destIsColumnar = srcIsColumnar;
|
|
|
|
ListCell *lc = NULL;
|
|
foreach(lc, alterTableStmt->cmds)
|
|
{
|
|
AlterTableCmd *alterTableCmd = castNode(AlterTableCmd, lfirst(lc));
|
|
|
|
if (alterTableCmd->subtype == AT_SetRelOptions ||
|
|
alterTableCmd->subtype == AT_ResetRelOptions)
|
|
{
|
|
List *options = castNode(List, alterTableCmd->def);
|
|
|
|
alterTableCmd->def = (Node *) ExtractColumnarRelOptions(
|
|
options, columnarOptions);
|
|
|
|
if (destIsColumnar)
|
|
{
|
|
columnarRangeVar = alterTableStmt->relation;
|
|
}
|
|
}
|
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
|
else if (alterTableCmd->subtype == AT_SetAccessMethod)
|
|
{
|
|
if (columnarRangeVar || *columnarOptions)
|
|
{
|
|
ereport(ERROR, (errmsg(
|
|
"ALTER TABLE cannot alter the access method after altering storage parameters"),
|
|
errhint(
|
|
"Specify SET ACCESS METHOD before storage parameters, or use separate ALTER TABLE commands.")));
|
|
}
|
|
|
|
destIsColumnar = (strcmp(alterTableCmd->name, COLUMNAR_AM_NAME) == 0);
|
|
|
|
if (srcIsColumnar && !destIsColumnar)
|
|
{
|
|
DeleteColumnarTableOptions(RelationGetRelid(rel), true);
|
|
}
|
|
}
|
|
#endif /* PG_VERSION_15 */
|
|
}
|
|
|
|
relation_close(rel, NoLock);
|
|
|
|
return columnarRangeVar;
|
|
}
|
|
|
|
|
|
/*
|
|
* Utility hook for columnar tables.
|
|
*/
|
|
static void
|
|
ColumnarProcessUtility(PlannedStmt *pstmt,
|
|
const char *queryString,
|
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
|
bool readOnlyTree,
|
|
#endif
|
|
ProcessUtilityContext context,
|
|
ParamListInfo params,
|
|
struct QueryEnvironment *queryEnv,
|
|
DestReceiver *dest,
|
|
QueryCompletion *completionTag)
|
|
{
|
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
|
if (readOnlyTree)
|
|
{
|
|
pstmt = copyObject(pstmt);
|
|
}
|
|
#endif
|
|
|
|
Node *parsetree = pstmt->utilityStmt;
|
|
|
|
RangeVar *columnarRangeVar = NULL;
|
|
List *columnarOptions = NIL;
|
|
|
|
switch (nodeTag(parsetree))
|
|
{
|
|
case T_IndexStmt:
|
|
{
|
|
IndexStmt *indexStmt = (IndexStmt *) parsetree;
|
|
|
|
Relation rel = relation_openrv(indexStmt->relation,
|
|
indexStmt->concurrent ?
|
|
ShareUpdateExclusiveLock :
|
|
ShareLock);
|
|
|
|
if (rel->rd_tableam == GetColumnarTableAmRoutine())
|
|
{
|
|
CheckCitusColumnarVersion(ERROR);
|
|
if (!ColumnarSupportsIndexAM(indexStmt->accessMethod))
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("unsupported access method for the "
|
|
"index on columnar table %s",
|
|
RelationGetRelationName(rel))));
|
|
}
|
|
}
|
|
|
|
RelationClose(rel);
|
|
break;
|
|
}
|
|
|
|
case T_CreateStmt:
|
|
{
|
|
CreateStmt *createStmt = castNode(CreateStmt, parsetree);
|
|
bool no_op = false;
|
|
|
|
if (createStmt->if_not_exists)
|
|
{
|
|
Oid existing_relid;
|
|
|
|
/* use same check as transformCreateStmt */
|
|
(void) RangeVarGetAndCheckCreationNamespace(
|
|
createStmt->relation, AccessShareLock, &existing_relid);
|
|
|
|
no_op = OidIsValid(existing_relid);
|
|
}
|
|
|
|
if (!no_op && createStmt->accessMethod != NULL &&
|
|
!strcmp(createStmt->accessMethod, COLUMNAR_AM_NAME))
|
|
{
|
|
columnarRangeVar = createStmt->relation;
|
|
createStmt->options = ExtractColumnarRelOptions(createStmt->options,
|
|
&columnarOptions);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case T_CreateTableAsStmt:
|
|
{
|
|
CreateTableAsStmt *createTableAsStmt = castNode(CreateTableAsStmt, parsetree);
|
|
IntoClause *into = createTableAsStmt->into;
|
|
bool no_op = false;
|
|
|
|
if (createTableAsStmt->if_not_exists)
|
|
{
|
|
Oid existing_relid;
|
|
|
|
/* use same check as transformCreateStmt */
|
|
(void) RangeVarGetAndCheckCreationNamespace(
|
|
into->rel, AccessShareLock, &existing_relid);
|
|
|
|
no_op = OidIsValid(existing_relid);
|
|
}
|
|
|
|
if (!no_op && into->accessMethod != NULL &&
|
|
!strcmp(into->accessMethod, COLUMNAR_AM_NAME))
|
|
{
|
|
columnarRangeVar = into->rel;
|
|
into->options = ExtractColumnarRelOptions(into->options,
|
|
&columnarOptions);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case T_AlterTableStmt:
|
|
{
|
|
AlterTableStmt *alterTableStmt = castNode(AlterTableStmt, parsetree);
|
|
columnarRangeVar = ColumnarProcessAlterTable(alterTableStmt,
|
|
&columnarOptions);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
|
|
/* FALL THROUGH */
|
|
break;
|
|
}
|
|
|
|
if (columnarOptions != NIL && columnarRangeVar == NULL)
|
|
{
|
|
ereport(ERROR,
|
|
(errmsg("columnar storage parameters specified on non-columnar table")));
|
|
}
|
|
|
|
if (IsA(parsetree, CreateExtensionStmt))
|
|
{
|
|
CheckCitusColumnarCreateExtensionStmt(parsetree);
|
|
}
|
|
|
|
if (IsA(parsetree, AlterExtensionStmt))
|
|
{
|
|
CheckCitusColumnarAlterExtensionStmt(parsetree);
|
|
}
|
|
|
|
PrevProcessUtilityHook_compat(pstmt, queryString, false, context,
|
|
params, queryEnv, dest, completionTag);
|
|
|
|
if (columnarOptions != NIL)
|
|
{
|
|
SetColumnarRelOptions(columnarRangeVar, columnarOptions);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarSupportsIndexAM returns true if indexAM with given name is
|
|
* supported by columnar tables.
|
|
*/
|
|
bool
|
|
ColumnarSupportsIndexAM(char *indexAMName)
|
|
{
|
|
return strncmp(indexAMName, "btree", NAMEDATALEN) == 0 ||
|
|
strncmp(indexAMName, "hash", NAMEDATALEN) == 0;
|
|
}
|
|
|
|
|
|
/*
|
|
* IsColumnarTableAmTable returns true if relation has columnar_tableam
|
|
* access method. This can be called before extension creation.
|
|
*/
|
|
bool
|
|
IsColumnarTableAmTable(Oid relationId)
|
|
{
|
|
if (!OidIsValid(relationId))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Lock relation to prevent it from being dropped &
|
|
* avoid race conditions.
|
|
*/
|
|
Relation rel = relation_open(relationId, AccessShareLock);
|
|
bool result = rel->rd_tableam == GetColumnarTableAmRoutine();
|
|
relation_close(rel, NoLock);
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckCitusColumnarCreateExtensionStmt determines whether can install
|
|
* citus_columnar per given CREATE extension statment
|
|
*/
|
|
void
|
|
CheckCitusColumnarCreateExtensionStmt(Node *parseTree)
|
|
{
|
|
CreateExtensionStmt *createExtensionStmt = castNode(CreateExtensionStmt,
|
|
parseTree);
|
|
if (get_extension_oid("citus_columnar", true) == InvalidOid)
|
|
{
|
|
if (strcmp(createExtensionStmt->extname, "citus_columnar") == 0)
|
|
{
|
|
DefElem *newVersionValue = GetExtensionOption(
|
|
createExtensionStmt->options,
|
|
"new_version");
|
|
|
|
/*we are not allowed to install citus_columnar as version 11.1-0 by cx*/
|
|
if (newVersionValue)
|
|
{
|
|
const char *newVersion = defGetString(newVersionValue);
|
|
if (strcmp(newVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg(
|
|
"unsupported citus_columnar version 11.1-0")));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckCitusColumnarAlterExtensionStmt determines whether can alter
|
|
* citus_columnar per given ALTER extension statment
|
|
*/
|
|
void
|
|
CheckCitusColumnarAlterExtensionStmt(Node *parseTree)
|
|
{
|
|
AlterExtensionStmt *alterExtensionStmt = castNode(AlterExtensionStmt, parseTree);
|
|
if (strcmp(alterExtensionStmt->extname, "citus_columnar") == 0)
|
|
{
|
|
DefElem *newVersionValue = GetExtensionOption(alterExtensionStmt->options,
|
|
"new_version");
|
|
|
|
/*we are not allowed cx to downgrade citus_columnar to 11.1-0*/
|
|
if (newVersionValue)
|
|
{
|
|
const char *newVersion = defGetString(newVersionValue);
|
|
if (strcmp(newVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("unsupported citus_columnar version 11.1-0")));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static const TableAmRoutine columnar_am_methods = {
|
|
.type = T_TableAmRoutine,
|
|
|
|
.slot_callbacks = columnar_slot_callbacks,
|
|
|
|
.scan_begin = columnar_beginscan,
|
|
.scan_end = columnar_endscan,
|
|
.scan_rescan = columnar_rescan,
|
|
.scan_getnextslot = columnar_getnextslot,
|
|
|
|
.parallelscan_estimate = columnar_parallelscan_estimate,
|
|
.parallelscan_initialize = columnar_parallelscan_initialize,
|
|
.parallelscan_reinitialize = columnar_parallelscan_reinitialize,
|
|
|
|
.index_fetch_begin = columnar_index_fetch_begin,
|
|
.index_fetch_reset = columnar_index_fetch_reset,
|
|
.index_fetch_end = columnar_index_fetch_end,
|
|
.index_fetch_tuple = columnar_index_fetch_tuple,
|
|
|
|
.tuple_fetch_row_version = columnar_fetch_row_version,
|
|
.tuple_get_latest_tid = columnar_get_latest_tid,
|
|
.tuple_tid_valid = columnar_tuple_tid_valid,
|
|
.tuple_satisfies_snapshot = columnar_tuple_satisfies_snapshot,
|
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
|
.index_delete_tuples = columnar_index_delete_tuples,
|
|
#else
|
|
.compute_xid_horizon_for_tuples = columnar_compute_xid_horizon_for_tuples,
|
|
#endif
|
|
|
|
.tuple_insert = columnar_tuple_insert,
|
|
.tuple_insert_speculative = columnar_tuple_insert_speculative,
|
|
.tuple_complete_speculative = columnar_tuple_complete_speculative,
|
|
.multi_insert = columnar_multi_insert,
|
|
.tuple_delete = columnar_tuple_delete,
|
|
.tuple_update = columnar_tuple_update,
|
|
.tuple_lock = columnar_tuple_lock,
|
|
.finish_bulk_insert = columnar_finish_bulk_insert,
|
|
|
|
.relation_set_new_filenode = columnar_relation_set_new_filenode,
|
|
.relation_nontransactional_truncate = columnar_relation_nontransactional_truncate,
|
|
.relation_copy_data = columnar_relation_copy_data,
|
|
.relation_copy_for_cluster = columnar_relation_copy_for_cluster,
|
|
.relation_vacuum = columnar_vacuum_rel,
|
|
.scan_analyze_next_block = columnar_scan_analyze_next_block,
|
|
.scan_analyze_next_tuple = columnar_scan_analyze_next_tuple,
|
|
.index_build_range_scan = columnar_index_build_range_scan,
|
|
.index_validate_scan = columnar_index_validate_scan,
|
|
|
|
.relation_size = columnar_relation_size,
|
|
.relation_needs_toast_table = columnar_relation_needs_toast_table,
|
|
|
|
.relation_estimate_size = columnar_estimate_rel_size,
|
|
|
|
.scan_bitmap_next_block = NULL,
|
|
.scan_bitmap_next_tuple = NULL,
|
|
.scan_sample_next_block = columnar_scan_sample_next_block,
|
|
.scan_sample_next_tuple = columnar_scan_sample_next_tuple
|
|
};
|
|
|
|
|
|
const TableAmRoutine *
|
|
GetColumnarTableAmRoutine(void)
|
|
{
|
|
return &columnar_am_methods;
|
|
}
|
|
|
|
|
|
PG_FUNCTION_INFO_V1(columnar_handler);
|
|
Datum
|
|
columnar_handler(PG_FUNCTION_ARGS)
|
|
{
|
|
PG_RETURN_POINTER(&columnar_am_methods);
|
|
}
|
|
|
|
|
|
/*
|
|
* detoast_values
|
|
*
|
|
* Detoast and decompress all values. If there's no work to do, return
|
|
* original pointer; otherwise return a newly-allocated values array. Should
|
|
* be called in per-tuple context.
|
|
*/
|
|
static Datum *
|
|
detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull)
|
|
{
|
|
int natts = tupleDesc->natts;
|
|
|
|
/* copy on write to optimize for case where nothing is toasted */
|
|
Datum *values = orig_values;
|
|
|
|
for (int i = 0; i < tupleDesc->natts; i++)
|
|
{
|
|
if (!isnull[i] && tupleDesc->attrs[i].attlen == -1 &&
|
|
VARATT_IS_EXTENDED(values[i]))
|
|
{
|
|
/* make a copy */
|
|
if (values == orig_values)
|
|
{
|
|
values = palloc(sizeof(Datum) * natts);
|
|
|
|
/*
|
|
* We use IGNORE-BANNED here since we don't want to limit
|
|
* size of the buffer that holds the datum array to RSIZE_MAX
|
|
* unnecessarily.
|
|
*/
|
|
memcpy(values, orig_values, sizeof(Datum) * natts); /* IGNORE-BANNED */
|
|
}
|
|
|
|
/* will be freed when per-tuple context is reset */
|
|
struct varlena *new_value = (struct varlena *) DatumGetPointer(values[i]);
|
|
new_value = detoast_attr(new_value);
|
|
values[i] = PointerGetDatum(new_value);
|
|
}
|
|
}
|
|
|
|
return values;
|
|
}
|
|
|
|
|
|
/*
|
|
* ColumnarCheckLogicalReplication throws an error if the relation is
|
|
* part of any publication. This should be called before any write to
|
|
* a columnar table, because columnar changes are not replicated with
|
|
* logical replication (similar to a row table without a replica
|
|
* identity).
|
|
*/
|
|
static void
|
|
ColumnarCheckLogicalReplication(Relation rel)
|
|
{
|
|
bool pubActionInsert = false;
|
|
|
|
if (!is_publishable_relation(rel))
|
|
{
|
|
return;
|
|
}
|
|
|
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
|
{
|
|
PublicationDesc pubdesc;
|
|
|
|
RelationBuildPublicationDesc(rel, &pubdesc);
|
|
pubActionInsert = pubdesc.pubactions.pubinsert;
|
|
}
|
|
#else
|
|
if (rel->rd_pubactions == NULL)
|
|
{
|
|
GetRelationPublicationActions(rel);
|
|
Assert(rel->rd_pubactions != NULL);
|
|
}
|
|
pubActionInsert = rel->rd_pubactions->pubinsert;
|
|
#endif
|
|
|
|
if (pubActionInsert)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg(
|
|
"cannot insert into columnar table that is a part of a publication")));
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* alter_columnar_table_set()
|
|
*
|
|
* Deprecated in 11.1-1: should issue ALTER TABLE ... SET instead. Function
|
|
* still available, but implemented in PL/pgSQL instead of C.
|
|
*
|
|
* C code is removed -- the symbol may still be required in some
|
|
* upgrade/downgrade paths, but it should not be called.
|
|
*/
|
|
PG_FUNCTION_INFO_V1(alter_columnar_table_set);
|
|
Datum
|
|
alter_columnar_table_set(PG_FUNCTION_ARGS)
|
|
{
|
|
elog(ERROR, "alter_columnar_table_set is deprecated");
|
|
}
|
|
|
|
|
|
/*
|
|
* alter_columnar_table_reset()
|
|
*
|
|
* Deprecated in 11.1-1: should issue ALTER TABLE ... RESET instead. Function
|
|
* still available, but implemented in PL/pgSQL instead of C.
|
|
*
|
|
* C code is removed -- the symbol may still be required in some
|
|
* upgrade/downgrade paths, but it should not be called.
|
|
*/
|
|
PG_FUNCTION_INFO_V1(alter_columnar_table_reset);
|
|
Datum
|
|
alter_columnar_table_reset(PG_FUNCTION_ARGS)
|
|
{
|
|
elog(ERROR, "alter_columnar_table_reset is deprecated");
|
|
}
|
|
|
|
|
|
/*
|
|
* upgrade_columnar_storage - upgrade columnar storage to the current
|
|
* version.
|
|
*
|
|
* DDL:
|
|
* CREATE OR REPLACE FUNCTION upgrade_columnar_storage(rel regclass)
|
|
* RETURNS VOID
|
|
* STRICT
|
|
* LANGUAGE c AS 'MODULE_PATHNAME', 'upgrade_columnar_storage';
|
|
*/
|
|
PG_FUNCTION_INFO_V1(upgrade_columnar_storage);
|
|
Datum
|
|
upgrade_columnar_storage(PG_FUNCTION_ARGS)
|
|
{
|
|
Oid relid = PG_GETARG_OID(0);
|
|
|
|
/*
|
|
* ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we
|
|
* can take only an ACCESS SHARE LOCK. But all access to non-current
|
|
* columnar tables will fail anyway, so it's better to take ACCESS
|
|
* EXCLUSIVE LOCK now.
|
|
*/
|
|
Relation rel = table_open(relid, AccessExclusiveLock);
|
|
if (!IsColumnarTableAmTable(relid))
|
|
{
|
|
ereport(ERROR, (errmsg("table %s is not a columnar table",
|
|
quote_identifier(RelationGetRelationName(rel)))));
|
|
}
|
|
|
|
ColumnarStorageUpdateIfNeeded(rel, true);
|
|
|
|
table_close(rel, AccessExclusiveLock);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* downgrade_columnar_storage - downgrade columnar storage to the
|
|
* current version.
|
|
*
|
|
* DDL:
|
|
* CREATE OR REPLACE FUNCTION downgrade_columnar_storage(rel regclass)
|
|
* RETURNS VOID
|
|
* STRICT
|
|
* LANGUAGE c AS 'MODULE_PATHNAME', 'downgrade_columnar_storage';
|
|
*/
|
|
PG_FUNCTION_INFO_V1(downgrade_columnar_storage);
|
|
Datum
|
|
downgrade_columnar_storage(PG_FUNCTION_ARGS)
|
|
{
|
|
Oid relid = PG_GETARG_OID(0);
|
|
|
|
/*
|
|
* ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we
|
|
* can take only an ACCESS SHARE LOCK. But all access to non-current
|
|
* columnar tables will fail anyway, so it's better to take ACCESS
|
|
* EXCLUSIVE LOCK now.
|
|
*/
|
|
Relation rel = table_open(relid, AccessExclusiveLock);
|
|
if (!IsColumnarTableAmTable(relid))
|
|
{
|
|
ereport(ERROR, (errmsg("table %s is not a columnar table",
|
|
quote_identifier(RelationGetRelationName(rel)))));
|
|
}
|
|
|
|
ColumnarStorageUpdateIfNeeded(rel, false);
|
|
|
|
table_close(rel, AccessExclusiveLock);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* Code to check the Citus Version, helps remove dependency from Citus
|
|
*/
|
|
|
|
/*
|
|
* CitusColumnarHasBeenLoaded returns true if the citus extension has been created
|
|
* in the current database and the extension script has been executed. Otherwise,
|
|
* it returns false. The result is cached as this is called very frequently.
|
|
*/
|
|
bool
|
|
CitusColumnarHasBeenLoaded(void)
|
|
{
|
|
if (!extensionLoadedColumnar || creating_extension)
|
|
{
|
|
/*
|
|
* Refresh if we have not determined whether the extension has been
|
|
* loaded yet, or in case of ALTER EXTENSION since we want to treat
|
|
* Citus as "not loaded" during ALTER EXTENSION citus.
|
|
*/
|
|
bool extensionLoaded = CitusColumnarHasBeenLoadedInternal();
|
|
extensionLoadedColumnar = extensionLoaded;
|
|
}
|
|
|
|
return extensionLoadedColumnar;
|
|
}
|
|
|
|
|
|
/*
|
|
* CitusColumnarHasBeenLoadedInternal returns true if the citus extension has been created
|
|
* in the current database and the extension script has been executed. Otherwise,
|
|
* it returns false.
|
|
*/
|
|
static bool
|
|
CitusColumnarHasBeenLoadedInternal(void)
|
|
{
|
|
if (IsBinaryUpgrade)
|
|
{
|
|
/* never use Citus logic during pg_upgrade */
|
|
return false;
|
|
}
|
|
|
|
Oid citusExtensionOid = get_extension_oid("citus", true);
|
|
if (citusExtensionOid == InvalidOid)
|
|
{
|
|
/* Citus extension does not exist yet */
|
|
return false;
|
|
}
|
|
|
|
if (creating_extension && CurrentExtensionObject == citusExtensionOid)
|
|
{
|
|
/*
|
|
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
|
|
* since the objects used by the C code might be not be there yet.
|
|
*/
|
|
return false;
|
|
}
|
|
|
|
/* citus extension exists and has been created */
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckCitusColumnarVersion checks whether there is a version mismatch between the
|
|
* available version and the loaded version or between the installed version
|
|
* and the loaded version. Returns true if compatible, false otherwise.
|
|
*
|
|
* As a side effect, this function also sets citusVersionKnownCompatible_Columnar global
|
|
* variable to true which reduces version check cost of next calls.
|
|
*/
|
|
bool
|
|
CheckCitusColumnarVersion(int elevel)
|
|
{
|
|
if (citusVersionKnownCompatibleColumnar ||
|
|
!CitusColumnarHasBeenLoaded() ||
|
|
!EnableVersionChecksColumnar)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
if (CheckAvailableVersionColumnar(elevel) && CheckInstalledVersionColumnar(elevel))
|
|
{
|
|
citusVersionKnownCompatibleColumnar = true;
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently
|
|
* available version from the citus.control file. If they are not compatible,
|
|
* this function logs an error with the specified elevel and returns false,
|
|
* otherwise it returns true.
|
|
*/
|
|
bool
|
|
CheckAvailableVersionColumnar(int elevel)
|
|
{
|
|
if (!EnableVersionChecksColumnar)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
char *availableVersion = AvailableExtensionVersionColumnar();
|
|
|
|
if (!MajorVersionsCompatibleColumnar(availableVersion, CITUS_EXTENSIONVERSION))
|
|
{
|
|
ereport(elevel, (errmsg("loaded Citus library version differs from latest "
|
|
"available extension version"),
|
|
errdetail("Loaded library requires %s, but the latest control "
|
|
"file specifies %s.", CITUS_MAJORVERSION,
|
|
availableVersion),
|
|
errhint("Restart the database to load the latest Citus "
|
|
"library.")));
|
|
pfree(availableVersion);
|
|
return false;
|
|
}
|
|
pfree(availableVersion);
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the
|
|
* extension's current version from the pg_extension catalog table. If they
|
|
* are not compatible, this function logs an error with the specified elevel,
|
|
* otherwise it returns true.
|
|
*/
|
|
static bool
|
|
CheckInstalledVersionColumnar(int elevel)
|
|
{
|
|
Assert(CitusColumnarHasBeenLoaded());
|
|
Assert(EnableVersionChecksColumnar);
|
|
|
|
char *installedVersion = InstalledExtensionVersionColumnar();
|
|
|
|
if (!MajorVersionsCompatibleColumnar(installedVersion, CITUS_EXTENSIONVERSION))
|
|
{
|
|
ereport(elevel, (errmsg("loaded Citus library version differs from installed "
|
|
"extension version"),
|
|
errdetail("Loaded library requires %s, but the installed "
|
|
"extension version is %s.", CITUS_MAJORVERSION,
|
|
installedVersion),
|
|
errhint("Run ALTER EXTENSION citus UPDATE and try again.")));
|
|
pfree(installedVersion);
|
|
return false;
|
|
}
|
|
pfree(installedVersion);
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* MajorVersionsCompatible checks whether both versions are compatible. They
|
|
* are if major and minor version numbers match, the schema version is
|
|
* ignored. Returns true if compatible, false otherwise.
|
|
*/
|
|
bool
|
|
MajorVersionsCompatibleColumnar(char *leftVersion, char *rightVersion)
|
|
{
|
|
const char schemaVersionSeparator = '-';
|
|
|
|
char *leftSeperatorPosition = strchr(leftVersion, schemaVersionSeparator);
|
|
char *rightSeperatorPosition = strchr(rightVersion, schemaVersionSeparator);
|
|
int leftComparisionLimit = 0;
|
|
int rightComparisionLimit = 0;
|
|
|
|
if (leftSeperatorPosition != NULL)
|
|
{
|
|
leftComparisionLimit = leftSeperatorPosition - leftVersion;
|
|
}
|
|
else
|
|
{
|
|
leftComparisionLimit = strlen(leftVersion);
|
|
}
|
|
|
|
if (rightSeperatorPosition != NULL)
|
|
{
|
|
rightComparisionLimit = rightSeperatorPosition - rightVersion;
|
|
}
|
|
else
|
|
{
|
|
rightComparisionLimit = strlen(leftVersion);
|
|
}
|
|
|
|
/* we can error out early if hypens are not in the same position */
|
|
if (leftComparisionLimit != rightComparisionLimit)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return strncmp(leftVersion, rightVersion, leftComparisionLimit) == 0;
|
|
}
|
|
|
|
|
|
/*
|
|
* AvailableExtensionVersion returns the Citus version from citus.control file. It also
|
|
* saves the result, thus consecutive calls to CitusExtensionAvailableVersion will
|
|
* not read the citus.control file again.
|
|
*/
|
|
static char *
|
|
AvailableExtensionVersionColumnar(void)
|
|
{
|
|
LOCAL_FCINFO(fcinfo, 0);
|
|
FmgrInfo flinfo;
|
|
|
|
bool goForward = true;
|
|
bool doCopy = false;
|
|
char *availableExtensionVersion;
|
|
|
|
EState *estate = CreateExecutorState();
|
|
ReturnSetInfo *extensionsResultSet = makeNode(ReturnSetInfo);
|
|
extensionsResultSet->econtext = GetPerTupleExprContext(estate);
|
|
extensionsResultSet->allowedModes = SFRM_Materialize;
|
|
|
|
fmgr_info(F_PG_AVAILABLE_EXTENSIONS, &flinfo);
|
|
InitFunctionCallInfoData(*fcinfo, &flinfo, 0, InvalidOid, NULL,
|
|
(Node *) extensionsResultSet);
|
|
|
|
/* pg_available_extensions returns result set containing all available extensions */
|
|
(*pg_available_extensions)(fcinfo);
|
|
|
|
TupleTableSlot *tupleTableSlot = MakeSingleTupleTableSlot(
|
|
extensionsResultSet->setDesc,
|
|
&TTSOpsMinimalTuple);
|
|
bool hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult, goForward,
|
|
doCopy,
|
|
tupleTableSlot);
|
|
while (hasTuple)
|
|
{
|
|
bool isNull = false;
|
|
|
|
Datum extensionNameDatum = slot_getattr(tupleTableSlot, 1, &isNull);
|
|
char *extensionName = NameStr(*DatumGetName(extensionNameDatum));
|
|
if (strcmp(extensionName, "citus") == 0)
|
|
{
|
|
Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull);
|
|
|
|
|
|
availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion));
|
|
|
|
|
|
ExecClearTuple(tupleTableSlot);
|
|
ExecDropSingleTupleTableSlot(tupleTableSlot);
|
|
|
|
return availableExtensionVersion;
|
|
}
|
|
|
|
ExecClearTuple(tupleTableSlot);
|
|
hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult, goForward,
|
|
doCopy, tupleTableSlot);
|
|
}
|
|
|
|
ExecDropSingleTupleTableSlot(tupleTableSlot);
|
|
|
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("citus extension is not found")));
|
|
}
|
|
|
|
|
|
/*
|
|
* InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table.
|
|
*/
|
|
static char *
|
|
InstalledExtensionVersionColumnar(void)
|
|
{
|
|
ScanKeyData entry[1];
|
|
char *installedExtensionVersion = NULL;
|
|
|
|
Relation relation = table_open(ExtensionRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&entry[0], Anum_pg_extension_extname, BTEqualStrategyNumber, F_NAMEEQ,
|
|
CStringGetDatum("citus"));
|
|
|
|
SysScanDesc scandesc = systable_beginscan(relation, ExtensionNameIndexId, true,
|
|
NULL, 1, entry);
|
|
|
|
HeapTuple extensionTuple = systable_getnext(scandesc);
|
|
|
|
/* We assume that there can be at most one matching tuple */
|
|
if (HeapTupleIsValid(extensionTuple))
|
|
{
|
|
int extensionIndex = Anum_pg_extension_extversion;
|
|
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
|
bool isNull = false;
|
|
|
|
Datum installedVersion = heap_getattr(extensionTuple, extensionIndex,
|
|
tupleDescriptor, &isNull);
|
|
|
|
if (isNull)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("citus extension version is null")));
|
|
}
|
|
|
|
|
|
installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion));
|
|
}
|
|
else
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("citus extension is not loaded")));
|
|
}
|
|
|
|
systable_endscan(scandesc);
|
|
|
|
table_close(relation, AccessShareLock);
|
|
|
|
return installedExtensionVersion;
|
|
}
|
|
|
|
|
|
/*
|
|
* GetExtensionOption returns DefElem * node with "defname" from "options" list
|
|
*/
|
|
DefElem *
|
|
GetExtensionOption(List *extensionOptions, const char *defname)
|
|
{
|
|
DefElem *defElement = NULL;
|
|
foreach_ptr(defElement, extensionOptions)
|
|
{
|
|
if (IsA(defElement, DefElem) &&
|
|
strncmp(defElement->defname, defname, NAMEDATALEN) == 0)
|
|
{
|
|
return defElement;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|