mirror of https://github.com/citusdata/citus.git
Columnar WAL
parent
27d9957139
commit
aef5737421
|
@ -23,6 +23,7 @@
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_tableam.h"
|
#include "columnar/columnar_tableam.h"
|
||||||
|
#include "columnar/columnar_wal.h"
|
||||||
|
|
||||||
/* Default values for option parameters */
|
/* Default values for option parameters */
|
||||||
#define DEFAULT_STRIPE_ROW_COUNT 150000
|
#define DEFAULT_STRIPE_ROW_COUNT 150000
|
||||||
|
@ -57,6 +58,7 @@ static const struct config_enum_entry columnar_compression_options[] =
|
||||||
void
|
void
|
||||||
columnar_init(void)
|
columnar_init(void)
|
||||||
{
|
{
|
||||||
|
columnar_wal_init();
|
||||||
columnar_init_gucs();
|
columnar_init_gucs();
|
||||||
columnar_tableam_init();
|
columnar_tableam_init();
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@
|
||||||
#include "columnar/columnar_storage.h"
|
#include "columnar/columnar_storage.h"
|
||||||
#include "columnar/columnar_tableam.h"
|
#include "columnar/columnar_tableam.h"
|
||||||
#include "columnar/columnar_version_compat.h"
|
#include "columnar/columnar_version_compat.h"
|
||||||
|
#include "columnar/columnar_wal.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -757,6 +758,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
||||||
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
||||||
|
|
||||||
|
columnar_wal_insert(relation, slot, options);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,371 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* columnar_wal.c
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
* $Id$
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "access/bufmask.h"
|
||||||
|
#include "access/heapam.h"
|
||||||
|
#include "access/heapam_xlog.h"
|
||||||
|
#include "access/htup_details.h"
|
||||||
|
#include "access/rmgr.h"
|
||||||
|
#include "access/xlog_internal.h"
|
||||||
|
#include "access/xloginsert.h"
|
||||||
|
#include "catalog/catalog.h"
|
||||||
|
#include "columnar/columnar_wal.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "replication/decode.h"
|
||||||
|
#include "replication/logical.h"
|
||||||
|
#include "replication/reorderbuffer.h"
|
||||||
|
#include "safe_lib.h"
|
||||||
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/relfilenode.h"
|
||||||
|
#include "utils/relcache.h"
|
||||||
|
|
||||||
|
#define RM_COLUMNAR_ID 241
|
||||||
|
#define RM_COLUMNAR_NAME "columnar"
|
||||||
|
|
||||||
|
#define XLOG_COLUMNAR_PAGE_OVERWRITE 0
|
||||||
|
#define XLOG_COLUMNAR_PAGE_APPEND 1
|
||||||
|
#define XLOG_COLUMNAR_INSERT 2
|
||||||
|
|
||||||
|
typedef struct xl_columnar_insert
|
||||||
|
{
|
||||||
|
RelFileNode target_node;
|
||||||
|
OffsetNumber offnum; /* inserted tuple's offset */
|
||||||
|
uint16 t_infomask2;
|
||||||
|
uint16 t_infomask;
|
||||||
|
uint8 flags;
|
||||||
|
uint8 t_hoff;
|
||||||
|
} xl_columnar_insert;
|
||||||
|
|
||||||
|
#define SizeofColumnarInsert (offsetof(xl_columnar_insert, t_hoff) + sizeof(uint8))
|
||||||
|
|
||||||
|
static void columnar_redo_page_overwrite(XLogReaderState *record);
|
||||||
|
static void columnar_redo_page_append(XLogReaderState *record);
|
||||||
|
static void columnar_redo_insert(XLogReaderState *record);
|
||||||
|
|
||||||
|
static void columnar_redo(XLogReaderState *record);
|
||||||
|
static void columnar_desc(StringInfo buf, XLogReaderState *record);
|
||||||
|
static const char *columnar_identify(uint8 info);
|
||||||
|
static void columnar_startup(void);
|
||||||
|
static void columnar_cleanup(void);
|
||||||
|
static void columnar_mask(char *pagedata, BlockNumber blkno);
|
||||||
|
static void columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
|
struct XLogRecordBuffer *buf);
|
||||||
|
|
||||||
|
static RmgrData ColumnarRmgr = {
|
||||||
|
.rm_name = RM_COLUMNAR_NAME,
|
||||||
|
.rm_redo = columnar_redo,
|
||||||
|
.rm_desc = columnar_desc,
|
||||||
|
.rm_identify = columnar_identify,
|
||||||
|
.rm_startup = columnar_startup,
|
||||||
|
.rm_cleanup = columnar_cleanup,
|
||||||
|
.rm_mask = columnar_mask,
|
||||||
|
.rm_decode = columnar_decode
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
columnar_wal_init()
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_wal_init");
|
||||||
|
RegisterCustomRmgr(RM_COLUMNAR_ID, &ColumnarRmgr);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf,
|
||||||
|
uint32 len)
|
||||||
|
{
|
||||||
|
XLogRecPtr recptr;
|
||||||
|
Buffer buffer = ReadBuffer(rel, blockno);
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_page_overwrite");
|
||||||
|
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||||
|
|
||||||
|
START_CRIT_SECTION();
|
||||||
|
|
||||||
|
Page page = BufferGetPage(buffer);
|
||||||
|
|
||||||
|
PageHeader phdr = (PageHeader) page;
|
||||||
|
PageInit(page, BLCKSZ, 0);
|
||||||
|
|
||||||
|
if (len > phdr->pd_upper - phdr->pd_lower)
|
||||||
|
{
|
||||||
|
elog(ERROR,
|
||||||
|
"write of columnar data of length %d exceeds page size on block %d of relation %d",
|
||||||
|
len, blockno, rel->rd_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
|
||||||
|
phdr->pd_lower += len;
|
||||||
|
|
||||||
|
XLogBeginInsert();
|
||||||
|
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE);
|
||||||
|
recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_OVERWRITE);
|
||||||
|
PageSetLSN(page, recptr);
|
||||||
|
|
||||||
|
END_CRIT_SECTION();
|
||||||
|
UnlockReleaseBuffer(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
||||||
|
char *buf, uint32 len)
|
||||||
|
{
|
||||||
|
XLogRecPtr recptr;
|
||||||
|
Buffer buffer = ReadBuffer(rel, blockno);
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_page_append");
|
||||||
|
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||||
|
|
||||||
|
START_CRIT_SECTION();
|
||||||
|
|
||||||
|
Page page = BufferGetPage(buffer);
|
||||||
|
|
||||||
|
PageHeader phdr = (PageHeader) page;
|
||||||
|
if (PageIsNew(page))
|
||||||
|
{
|
||||||
|
PageInit(page, BLCKSZ, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset != phdr->pd_lower)
|
||||||
|
{
|
||||||
|
elog(ERROR,
|
||||||
|
"append of columnar data at offset %d does not match pd_lower %d of block %d relation %d",
|
||||||
|
offset, phdr->pd_lower, blockno, rel->rd_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > phdr->pd_upper - offset)
|
||||||
|
{
|
||||||
|
elog(ERROR,
|
||||||
|
"append of columnar data of length %d to offset %d exceeds free space of block %d relation %d",
|
||||||
|
len, offset, blockno, rel->rd_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
|
||||||
|
phdr->pd_lower += len;
|
||||||
|
|
||||||
|
XLogBeginInsert();
|
||||||
|
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE);
|
||||||
|
recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_APPEND);
|
||||||
|
PageSetLSN(page, recptr);
|
||||||
|
|
||||||
|
END_CRIT_SECTION();
|
||||||
|
UnlockReleaseBuffer(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
||||||
|
{
|
||||||
|
bool shouldFree = true;
|
||||||
|
HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
||||||
|
xl_columnar_insert xlrec;
|
||||||
|
|
||||||
|
if (options & HEAP_INSERT_NO_LOGICAL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_insert");
|
||||||
|
memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode));
|
||||||
|
|
||||||
|
xlrec.flags = 0;
|
||||||
|
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
|
||||||
|
|
||||||
|
if (options & HEAP_INSERT_SPECULATIVE)
|
||||||
|
{
|
||||||
|
xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsToastRelation(rel))
|
||||||
|
{
|
||||||
|
xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
|
||||||
|
}
|
||||||
|
|
||||||
|
xlrec.t_infomask2 = heaptup->t_data->t_infomask2;
|
||||||
|
xlrec.t_infomask = heaptup->t_data->t_infomask;
|
||||||
|
xlrec.t_hoff = heaptup->t_data->t_hoff;
|
||||||
|
|
||||||
|
XLogBeginInsert();
|
||||||
|
|
||||||
|
XLogRegisterData((char *) &xlrec, SizeofColumnarInsert);
|
||||||
|
XLogRegisterData((char *) heaptup->t_data + SizeofHeapTupleHeader,
|
||||||
|
heaptup->t_len - SizeofHeapTupleHeader);
|
||||||
|
|
||||||
|
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
|
||||||
|
|
||||||
|
/* not associated with any buffer, so ignore returned LSN */
|
||||||
|
(void) XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_INSERT);
|
||||||
|
|
||||||
|
if (shouldFree)
|
||||||
|
{
|
||||||
|
pfree(heaptup);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Full page images are always forced, so this is a no-op.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
columnar_redo_page_overwrite(XLogReaderState *record)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_redo_page_overwrite");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Full page images are always forced, so this is a no-op.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
columnar_redo_page_append(XLogReaderState *record)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_redo_page_append");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Columnar insert records are only for logical decoding. Nothing needs to
|
||||||
|
* be done for redo.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
columnar_redo_insert(XLogReaderState *record)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_redo_insert");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_redo(XLogReaderState *record)
|
||||||
|
{
|
||||||
|
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||||
|
|
||||||
|
elog(LOG, "columnar_redo");
|
||||||
|
switch (info)
|
||||||
|
{
|
||||||
|
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
||||||
|
columnar_redo_page_overwrite(record);
|
||||||
|
break;
|
||||||
|
case XLOG_COLUMNAR_PAGE_APPEND:
|
||||||
|
columnar_redo_page_append(record);
|
||||||
|
break;
|
||||||
|
case XLOG_COLUMNAR_INSERT:
|
||||||
|
columnar_redo_insert(record);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
elog(PANIC, "columnar_redo: unknown op code %u", info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_desc(StringInfo buf, XLogReaderState *record)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_redo");
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
columnar_identify(uint8 info)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_identify");
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_startup(void)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_rm_startup()");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_cleanup(void)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_rm_cleanup()");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_mask(char *pagedata, BlockNumber blkno)
|
||||||
|
{
|
||||||
|
elog(LOG, "columnar_mask");
|
||||||
|
mask_page_lsn_and_checksum(pagedata);
|
||||||
|
|
||||||
|
mask_unused_space(pagedata);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
|
struct XLogRecordBuffer *buf)
|
||||||
|
{
|
||||||
|
Size datalen;
|
||||||
|
char *tupledata;
|
||||||
|
Size tuplelen;
|
||||||
|
XLogReaderState *r = buf->record;
|
||||||
|
xl_columnar_insert *xlrec;
|
||||||
|
ReorderBufferChange *change;
|
||||||
|
|
||||||
|
elog(LOG, "columnar_decode");
|
||||||
|
xlrec = (xl_columnar_insert *) XLogRecGetData(r);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ignore insert records without new tuples (this does happen when
|
||||||
|
* raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
|
||||||
|
*/
|
||||||
|
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* only interested in our database */
|
||||||
|
if (xlrec->target_node.dbNode != ctx->slot->data.database)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* output plugin doesn't look for this origin, no need to queue */
|
||||||
|
|
||||||
|
if (ctx->callbacks.filter_by_origin_cb != NULL &&
|
||||||
|
filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r)))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
change = ReorderBufferGetChange(ctx->reorder);
|
||||||
|
if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
|
||||||
|
change->action = REORDER_BUFFER_CHANGE_INSERT;
|
||||||
|
else
|
||||||
|
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
|
||||||
|
change->origin_id = XLogRecGetOrigin(r);
|
||||||
|
|
||||||
|
memcpy(&change->data.tp.relnode, &xlrec->target_node,
|
||||||
|
sizeof(RelFileNode));
|
||||||
|
|
||||||
|
datalen = XLogRecGetDataLen(r);
|
||||||
|
tupledata = XLogRecGetData(r) + SizeofColumnarInsert;
|
||||||
|
tuplelen = datalen - SizeofColumnarInsert;
|
||||||
|
|
||||||
|
change->data.tp.newtuple =
|
||||||
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
||||||
|
|
||||||
|
ReorderBufferTupleBuf *newtuple = change->data.tp.newtuple;
|
||||||
|
newtuple->tuple.t_len = tuplelen + SizeofHeapTupleHeader;
|
||||||
|
HeapTupleHeader header = newtuple->tuple.t_data;
|
||||||
|
ItemPointerSetInvalid(&newtuple->tuple.t_self);
|
||||||
|
newtuple->tuple.t_tableOid = InvalidOid;
|
||||||
|
memset(header, 0, SizeofHeapTupleHeader);
|
||||||
|
memcpy((char *) newtuple->tuple.t_data + SizeofHeapTupleHeader,
|
||||||
|
tupledata, tuplelen);
|
||||||
|
|
||||||
|
header->t_infomask = xlrec->t_infomask;
|
||||||
|
header->t_infomask2 = xlrec->t_infomask2;
|
||||||
|
header->t_hoff = xlrec->t_hoff;
|
||||||
|
|
||||||
|
change->data.tp.clear_toast_afterwards = true;
|
||||||
|
|
||||||
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
|
||||||
|
change,
|
||||||
|
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_tableam.h"
|
#include "columnar/columnar_tableam.h"
|
||||||
|
#include "columnar/columnar_wal.h"
|
||||||
|
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* columnar_wal.h
|
||||||
|
*
|
||||||
|
* Type and function declarations for Columnar
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef COLUMNAR_WAL_H
|
||||||
|
#define COLUMNAR_WAL_H
|
||||||
|
|
||||||
|
extern void columnar_wal_init(void);
|
||||||
|
extern void columnar_wal_page_overwrite(Relation rel, BlockNumber blockno,
|
||||||
|
char *buf, uint32 len);
|
||||||
|
extern void columnar_wal_page_append(Relation rel, BlockNumber blockno,
|
||||||
|
uint32 offset, char *buf, uint32 len);
|
||||||
|
extern void columnar_wal_insert(Relation rel, TupleTableSlot *slot,
|
||||||
|
int options);
|
||||||
|
|
||||||
|
#endif /* COLUMNAR_WAL_H */
|
Loading…
Reference in New Issue