mirror of https://github.com/citusdata/citus.git
362 lines
11 KiB
C
362 lines
11 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* columnar.h
|
|
*
|
|
* Type and function declarations for Columnar
|
|
*
|
|
* Copyright (c) 2016, Citus Data, Inc.
|
|
*
|
|
* $Id$
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#ifndef COLUMNAR_H
|
|
#define COLUMNAR_H
|
|
#include "postgres.h"
|
|
|
|
#include "fmgr.h"
|
|
#include "lib/stringinfo.h"
|
|
#include "nodes/parsenodes.h"
|
|
#include "storage/bufpage.h"
|
|
#include "storage/lockdefs.h"
|
|
#include "storage/relfilenode.h"
|
|
#include "utils/relcache.h"
|
|
#include "utils/snapmgr.h"
|
|
|
|
/* Defines for valid option names */
|
|
#define OPTION_NAME_COMPRESSION_TYPE "compression"
|
|
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_limit"
|
|
#define OPTION_NAME_CHUNK_ROW_COUNT "chunk_group_row_limit"
|
|
|
|
/* Limits for option parameters */
|
|
#define STRIPE_ROW_COUNT_MINIMUM 1000
|
|
#define STRIPE_ROW_COUNT_MAXIMUM 10000000
|
|
#define CHUNK_ROW_COUNT_MINIMUM 1000
|
|
#define CHUNK_ROW_COUNT_MAXIMUM 100000
|
|
#define COMPRESSION_LEVEL_MIN 1
|
|
#define COMPRESSION_LEVEL_MAX 19
|
|
|
|
/* Columnar file signature */
|
|
#define COLUMNAR_VERSION_MAJOR 1
|
|
#define COLUMNAR_VERSION_MINOR 7
|
|
|
|
/* miscellaneous defines */
|
|
#define COLUMNAR_TUPLE_COST_MULTIPLIER 10
|
|
#define COLUMNAR_POSTSCRIPT_SIZE_LENGTH 1
|
|
#define COLUMNAR_POSTSCRIPT_SIZE_MAX 256
|
|
#define COLUMNAR_BYTES_PER_PAGE (BLCKSZ - SizeOfPageHeaderData)
|
|
|
|
/* Enumaration for columnar table's compression method */
|
|
typedef enum
|
|
{
|
|
COMPRESSION_TYPE_INVALID = -1,
|
|
COMPRESSION_NONE = 0,
|
|
COMPRESSION_PG_LZ = 1,
|
|
COMPRESSION_LZ4 = 2,
|
|
COMPRESSION_ZSTD = 3,
|
|
|
|
COMPRESSION_COUNT
|
|
} CompressionType;
|
|
|
|
|
|
/*
|
|
* ColumnarOptions holds the option values to be used when reading or writing
|
|
* a columnar table. To resolve these values, we first check foreign table's options,
|
|
* and if not present, we then fall back to the default values specified above.
|
|
*/
|
|
typedef struct ColumnarOptions
|
|
{
|
|
uint64 stripeRowCount;
|
|
uint32 chunkRowCount;
|
|
CompressionType compressionType;
|
|
int compressionLevel;
|
|
} ColumnarOptions;
|
|
|
|
|
|
/*
|
|
* ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction
|
|
* instance described below.
|
|
*/
|
|
typedef struct ColumnarTableDDLContext
|
|
{
|
|
char *schemaName;
|
|
char *relationName;
|
|
ColumnarOptions options;
|
|
} ColumnarTableDDLContext;
|
|
|
|
|
|
/*
|
|
* StripeMetadata represents information about a stripe. This information is
|
|
* stored in the metadata table "columnar.stripe".
|
|
*/
|
|
typedef struct StripeMetadata
|
|
{
|
|
uint64 fileOffset;
|
|
uint64 dataLength;
|
|
uint32 columnCount;
|
|
uint32 chunkCount;
|
|
uint32 chunkRowCount;
|
|
uint64 rowCount;
|
|
uint64 id;
|
|
} StripeMetadata;
|
|
|
|
|
|
/* ColumnChunkSkipNode contains statistics for a ColumnChunkData. */
|
|
typedef struct ColumnChunkSkipNode
|
|
{
|
|
/* statistics about values of a column chunk */
|
|
bool hasMinMax;
|
|
Datum minimumValue;
|
|
Datum maximumValue;
|
|
uint64 rowCount;
|
|
|
|
/*
|
|
* Offsets and sizes of value and exists streams in the column data.
|
|
* These enable us to skip reading suppressed row chunks, and start reading
|
|
* a chunk without reading previous chunks.
|
|
*/
|
|
uint64 valueChunkOffset;
|
|
uint64 valueLength;
|
|
uint64 existsChunkOffset;
|
|
uint64 existsLength;
|
|
|
|
/*
|
|
* This is used for (1) determining destination size when decompressing,
|
|
* (2) calculating compression rates when logging stats.
|
|
*/
|
|
uint64 decompressedValueSize;
|
|
|
|
CompressionType valueCompressionType;
|
|
int valueCompressionLevel;
|
|
} ColumnChunkSkipNode;
|
|
|
|
|
|
/*
|
|
* StripeSkipList can be used for skipping row chunks. It contains a column chunk
|
|
* skip node for each chunk of each column. chunkSkipNodeArray[column][chunk]
|
|
* is the entry for the specified column chunk.
|
|
*/
|
|
typedef struct StripeSkipList
|
|
{
|
|
ColumnChunkSkipNode **chunkSkipNodeArray;
|
|
uint32 columnCount;
|
|
uint32 chunkCount;
|
|
} StripeSkipList;
|
|
|
|
|
|
/*
|
|
* ChunkData represents a chunk of data for multiple columns. valueArray stores
|
|
* the values of data, and existsArray stores whether a value is present.
|
|
* valueBuffer is used to store (uncompressed) serialized values
|
|
* referenced by Datum's in valueArray. It is only used for by-reference Datum's.
|
|
* There is a one-to-one correspondence between valueArray and existsArray.
|
|
*/
|
|
typedef struct ChunkData
|
|
{
|
|
uint32 rowCount;
|
|
uint32 columnCount;
|
|
|
|
/*
|
|
* Following are indexed by [column][row]. If a column is not projected,
|
|
* then existsArray[column] and valueArray[column] are NULL.
|
|
*/
|
|
bool **existsArray;
|
|
Datum **valueArray;
|
|
|
|
/* valueBuffer keeps actual data for type-by-reference datums from valueArray. */
|
|
StringInfo *valueBufferArray;
|
|
} ChunkData;
|
|
|
|
|
|
/*
|
|
* ColumnChunkBuffers represents a chunk of serialized data in a column.
|
|
* valueBuffer stores the serialized values of data, and existsBuffer stores
|
|
* serialized value of presence information. valueCompressionType contains
|
|
* compression type if valueBuffer is compressed. Finally rowCount has
|
|
* the number of rows in this chunk.
|
|
*/
|
|
typedef struct ColumnChunkBuffers
|
|
{
|
|
StringInfo existsBuffer;
|
|
StringInfo valueBuffer;
|
|
CompressionType valueCompressionType;
|
|
uint64 decompressedValueSize;
|
|
} ColumnChunkBuffers;
|
|
|
|
|
|
/*
|
|
* ColumnBuffers represents data buffers for a column in a row stripe. Each
|
|
* column is made of multiple column chunks.
|
|
*/
|
|
typedef struct ColumnBuffers
|
|
{
|
|
ColumnChunkBuffers **chunkBuffersArray;
|
|
} ColumnBuffers;
|
|
|
|
|
|
/* StripeBuffers represents data for a row stripe. */
|
|
typedef struct StripeBuffers
|
|
{
|
|
uint32 columnCount;
|
|
uint32 rowCount;
|
|
ColumnBuffers **columnBuffersArray;
|
|
|
|
/*
|
|
* We might skip reading some chunks because they're refuted by the
|
|
* WHERE clause. We keep number of selected chunks and number of rows
|
|
* in each of them.
|
|
*/
|
|
uint32 selectedChunks;
|
|
uint32 *selectedChunkRowCount;
|
|
} StripeBuffers;
|
|
|
|
|
|
/* TableReadState represents state of a columnar scan. */
|
|
struct TableReadState;
|
|
typedef struct TableReadState TableReadState;
|
|
|
|
|
|
/* TableWriteState represents state of a columnar write operation. */
|
|
struct TableWriteState;
|
|
typedef struct TableWriteState TableWriteState;
|
|
|
|
extern int columnar_compression;
|
|
extern int columnar_stripe_row_limit;
|
|
extern int columnar_chunk_group_row_limit;
|
|
extern int columnar_compression_level;
|
|
|
|
extern void columnar_init_gucs(void);
|
|
|
|
extern CompressionType ParseCompressionType(const char *compressionTypeString);
|
|
|
|
/* Function declarations for writing to a columnar table */
|
|
extern TableWriteState * ColumnarBeginWrite(RelFileNode relfilenode,
|
|
ColumnarOptions options,
|
|
TupleDesc tupleDescriptor);
|
|
extern void ColumnarWriteRow(TableWriteState *state, Datum *columnValues,
|
|
bool *columnNulls);
|
|
extern void ColumnarFlushPendingWrites(TableWriteState *state);
|
|
extern void ColumnarEndWrite(TableWriteState *state);
|
|
extern bool ContainsPendingWrites(TableWriteState *state);
|
|
extern MemoryContext ColumnarWritePerTupleContext(TableWriteState *state);
|
|
|
|
/* Function declarations for reading from columnar table */
|
|
extern TableReadState * ColumnarBeginRead(Relation relation,
|
|
TupleDesc tupleDescriptor,
|
|
List *projectedColumnList,
|
|
List *qualConditions);
|
|
extern bool ColumnarReadNextRow(TableReadState *state, Datum *columnValues,
|
|
bool *columnNulls);
|
|
extern void ColumnarRescan(TableReadState *readState);
|
|
extern void ColumnarEndRead(TableReadState *state);
|
|
extern int64 ColumnarReadChunkGroupsFiltered(TableReadState *state);
|
|
|
|
/* Function declarations for common functions */
|
|
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
|
int16 procedureId);
|
|
extern ChunkData * CreateEmptyChunkData(uint32 columnCount, bool *columnMask,
|
|
uint32 chunkRowCount);
|
|
extern void FreeChunkData(ChunkData *chunkData);
|
|
extern uint64 ColumnarTableRowCount(Relation relation);
|
|
extern bool CompressBuffer(StringInfo inputBuffer,
|
|
StringInfo outputBuffer,
|
|
CompressionType compressionType,
|
|
int compressionLevel);
|
|
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType,
|
|
uint64 decompressedSize);
|
|
extern const char * CompressionTypeStr(CompressionType type);
|
|
|
|
/* columnar_metadata_tables.c */
|
|
extern void InitColumnarOptions(Oid regclass);
|
|
extern void SetColumnarOptions(Oid regclass, ColumnarOptions *options);
|
|
extern bool DeleteColumnarTableOptions(Oid regclass, bool missingOk);
|
|
extern bool ReadColumnarOptions(Oid regclass, ColumnarOptions *options);
|
|
extern void WriteToSmgr(Relation relation, uint64 logicalOffset,
|
|
char *data, uint32 dataLength);
|
|
extern StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
|
|
extern bool IsColumnarTableAmTable(Oid relationId);
|
|
|
|
/* columnar_metadata_tables.c */
|
|
extern void DeleteMetadataRows(RelFileNode relfilenode);
|
|
extern List * StripesForRelfilenode(RelFileNode relfilenode);
|
|
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
|
|
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
|
uint64 rowCount, uint64 columnCount,
|
|
uint64 chunkCount, uint64 chunkRowCount);
|
|
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
|
StripeSkipList *stripeSkipList,
|
|
TupleDesc tupleDescriptor);
|
|
extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
|
|
List *chunkGroupRowCounts);
|
|
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
|
TupleDesc tupleDescriptor,
|
|
uint32 chunkCount);
|
|
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
|
|
|
|
|
|
/* write_state_management.c */
|
|
extern TableWriteState * columnar_init_write_state(Relation relation, TupleDesc
|
|
tupdesc,
|
|
SubTransactionId currentSubXid);
|
|
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId
|
|
currentSubXid);
|
|
extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
|
|
parentSubXid);
|
|
extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
|
|
parentSubXid);
|
|
extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid);
|
|
extern void NonTransactionDropWriteState(Oid relfilenode);
|
|
extern bool PendingWritesInUpperTransactions(Oid relfilenode,
|
|
SubTransactionId currentSubXid);
|
|
extern MemoryContext GetWriteContextForDebug(void);
|
|
|
|
typedef struct SmgrAddr
|
|
{
|
|
BlockNumber blockno;
|
|
uint32 offset;
|
|
} SmgrAddr;
|
|
|
|
/*
|
|
* Map logical offsets (as tracked in the metadata) to a physical page and
|
|
* offset where the data is kept.
|
|
*/
|
|
static inline SmgrAddr
|
|
logical_to_smgr(uint64 logicalOffset)
|
|
{
|
|
SmgrAddr addr;
|
|
|
|
addr.blockno = logicalOffset / COLUMNAR_BYTES_PER_PAGE;
|
|
addr.offset = SizeOfPageHeaderData + (logicalOffset % COLUMNAR_BYTES_PER_PAGE);
|
|
|
|
return addr;
|
|
}
|
|
|
|
|
|
/*
|
|
* Map a physical page adnd offset address to a logical address.
|
|
*/
|
|
static inline uint64
|
|
smgr_to_logical(SmgrAddr addr)
|
|
{
|
|
return COLUMNAR_BYTES_PER_PAGE * addr.blockno + addr.offset - SizeOfPageHeaderData;
|
|
}
|
|
|
|
|
|
/*
|
|
* Get the first usable address of next block.
|
|
*/
|
|
static inline SmgrAddr
|
|
next_block_start(SmgrAddr addr)
|
|
{
|
|
SmgrAddr result = {
|
|
.blockno = addr.blockno + 1,
|
|
.offset = SizeOfPageHeaderData
|
|
};
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
#endif /* COLUMNAR_H */
|