diff --git a/src/backend/columnar/columnar.c b/src/backend/columnar/columnar.c index 85ec06d00..b28a1cee9 100644 --- a/src/backend/columnar/columnar.c +++ b/src/backend/columnar/columnar.c @@ -23,6 +23,7 @@ #include "citus_version.h" #include "columnar/columnar.h" #include "columnar/columnar_tableam.h" +#include "columnar/columnar_wal.h" /* Default values for option parameters */ #define DEFAULT_STRIPE_ROW_COUNT 150000 @@ -57,6 +58,7 @@ static const struct config_enum_entry columnar_compression_options[] = void columnar_init(void) { + columnar_wal_init(); columnar_init_gucs(); columnar_tableam_init(); } diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 333231dec..3e1dd8b30 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -58,6 +58,7 @@ #include "columnar/columnar_storage.h" #include "columnar/columnar_tableam.h" #include "columnar/columnar_version_compat.h" +#include "columnar/columnar_wal.h" #include "distributed/listutils.h" /* @@ -757,6 +758,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull); slot->tts_tid = row_number_to_tid(writtenRowNumber); + columnar_wal_insert(relation, slot, options); + MemoryContextSwitchTo(oldContext); MemoryContextReset(ColumnarWritePerTupleContext(writeState)); } diff --git a/src/backend/columnar/columnar_wal.c b/src/backend/columnar/columnar_wal.c new file mode 100644 index 000000000..d31398490 --- /dev/null +++ b/src/backend/columnar/columnar_wal.c @@ -0,0 +1,371 @@ +/*------------------------------------------------------------------------- + * + * columnar_wal.c + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/heapam.h" +#include "access/heapam_xlog.h" +#include "access/htup_details.h" +#include "access/rmgr.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "catalog/catalog.h" +#include "columnar/columnar_wal.h" +#include "executor/tuptable.h" +#include "miscadmin.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/reorderbuffer.h" +#include "safe_lib.h" +#include "storage/bufmgr.h" +#include "storage/relfilenode.h" +#include "utils/relcache.h" + +#define RM_COLUMNAR_ID 241 +#define RM_COLUMNAR_NAME "columnar" + +#define XLOG_COLUMNAR_PAGE_OVERWRITE 0 +#define XLOG_COLUMNAR_PAGE_APPEND 1 +#define XLOG_COLUMNAR_INSERT 2 + +typedef struct xl_columnar_insert +{ + RelFileNode target_node; + OffsetNumber offnum; /* inserted tuple's offset */ + uint16 t_infomask2; + uint16 t_infomask; + uint8 flags; + uint8 t_hoff; +} xl_columnar_insert; + +#define SizeofColumnarInsert (offsetof(xl_columnar_insert, t_hoff) + sizeof(uint8)) + +static void columnar_redo_page_overwrite(XLogReaderState *record); +static void columnar_redo_page_append(XLogReaderState *record); +static void columnar_redo_insert(XLogReaderState *record); + +static void columnar_redo(XLogReaderState *record); +static void columnar_desc(StringInfo buf, XLogReaderState *record); +static const char *columnar_identify(uint8 info); +static void columnar_startup(void); +static void columnar_cleanup(void); +static void columnar_mask(char *pagedata, BlockNumber blkno); +static void columnar_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); + +static RmgrData ColumnarRmgr = { + .rm_name = RM_COLUMNAR_NAME, + .rm_redo = columnar_redo, + .rm_desc = columnar_desc, + .rm_identify = columnar_identify, + .rm_startup = columnar_startup, + .rm_cleanup = columnar_cleanup, + .rm_mask = columnar_mask, + .rm_decode = columnar_decode +}; + +void +columnar_wal_init() +{ + elog(LOG, "columnar_wal_init"); + RegisterCustomRmgr(RM_COLUMNAR_ID, &ColumnarRmgr); +} + +void +columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf, + uint32 len) +{ + XLogRecPtr recptr; + Buffer buffer = ReadBuffer(rel, blockno); + + elog(LOG, "columnar_wal_page_overwrite"); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + START_CRIT_SECTION(); + + Page page = BufferGetPage(buffer); + + PageHeader phdr = (PageHeader) page; + PageInit(page, BLCKSZ, 0); + + if (len > phdr->pd_upper - phdr->pd_lower) + { + elog(ERROR, + "write of columnar data of length %d exceeds page size on block %d of relation %d", + len, blockno, rel->rd_id); + } + + memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); + phdr->pd_lower += len; + + XLogBeginInsert(); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE); + recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_OVERWRITE); + PageSetLSN(page, recptr); + + END_CRIT_SECTION(); + UnlockReleaseBuffer(buffer); +} + +void +columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, + char *buf, uint32 len) +{ + XLogRecPtr recptr; + Buffer buffer = ReadBuffer(rel, blockno); + + elog(LOG, "columnar_wal_page_append"); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + START_CRIT_SECTION(); + + Page page = BufferGetPage(buffer); + + PageHeader phdr = (PageHeader) page; + if (PageIsNew(page)) + { + PageInit(page, BLCKSZ, 0); + } + + if (offset != phdr->pd_lower) + { + elog(ERROR, + "append of columnar data at offset %d does not match pd_lower %d of block %d relation %d", + offset, phdr->pd_lower, blockno, rel->rd_id); + } + + if (len > phdr->pd_upper - offset) + { + elog(ERROR, + "append of columnar data of length %d to offset %d exceeds free space of block %d relation %d", + len, offset, blockno, rel->rd_id); + } + + memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); + phdr->pd_lower += len; + + XLogBeginInsert(); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE); + recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_APPEND); + PageSetLSN(page, recptr); + + END_CRIT_SECTION(); + UnlockReleaseBuffer(buffer); +} + +void +columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) +{ + bool shouldFree = true; + HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + xl_columnar_insert xlrec; + + if (options & HEAP_INSERT_NO_LOGICAL) + { + return; + } + + elog(LOG, "columnar_wal_insert"); + memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode)); + + xlrec.flags = 0; + xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; + + if (options & HEAP_INSERT_SPECULATIVE) + { + xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; + } + + if (IsToastRelation(rel)) + { + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; + } + + xlrec.t_infomask2 = heaptup->t_data->t_infomask2; + xlrec.t_infomask = heaptup->t_data->t_infomask; + xlrec.t_hoff = heaptup->t_data->t_hoff; + + XLogBeginInsert(); + + XLogRegisterData((char *) &xlrec, SizeofColumnarInsert); + XLogRegisterData((char *) heaptup->t_data + SizeofHeapTupleHeader, + heaptup->t_len - SizeofHeapTupleHeader); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + /* not associated with any buffer, so ignore returned LSN */ + (void) XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_INSERT); + + if (shouldFree) + { + pfree(heaptup); + } +} + +/* + * Full page images are always forced, so this is a no-op. + */ +static void +columnar_redo_page_overwrite(XLogReaderState *record) +{ + elog(LOG, "columnar_redo_page_overwrite"); + return; +} + +/* + * Full page images are always forced, so this is a no-op. + */ +static void +columnar_redo_page_append(XLogReaderState *record) +{ + elog(LOG, "columnar_redo_page_append"); + return; +} + +/* + * Columnar insert records are only for logical decoding. Nothing needs to + * be done for redo. + */ +static void +columnar_redo_insert(XLogReaderState *record) +{ + elog(LOG, "columnar_redo_insert"); + return; +} + +static void +columnar_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + elog(LOG, "columnar_redo"); + switch (info) + { + case XLOG_COLUMNAR_PAGE_OVERWRITE: + columnar_redo_page_overwrite(record); + break; + case XLOG_COLUMNAR_PAGE_APPEND: + columnar_redo_page_append(record); + break; + case XLOG_COLUMNAR_INSERT: + columnar_redo_insert(record); + break; + default: + elog(PANIC, "columnar_redo: unknown op code %u", info); + } +} + +static void +columnar_desc(StringInfo buf, XLogReaderState *record) +{ + elog(LOG, "columnar_redo"); +} + +static const char * +columnar_identify(uint8 info) +{ + elog(LOG, "columnar_identify"); + return ""; +} + +static void +columnar_startup(void) +{ + elog(LOG, "columnar_rm_startup()"); +} + +static void +columnar_cleanup(void) +{ + elog(LOG, "columnar_rm_cleanup()"); +} + +static void +columnar_mask(char *pagedata, BlockNumber blkno) +{ + elog(LOG, "columnar_mask"); + mask_page_lsn_and_checksum(pagedata); + + mask_unused_space(pagedata); +} + +static void +columnar_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + Size datalen; + char *tupledata; + Size tuplelen; + XLogReaderState *r = buf->record; + xl_columnar_insert *xlrec; + ReorderBufferChange *change; + + elog(LOG, "columnar_decode"); + xlrec = (xl_columnar_insert *) XLogRecGetData(r); + + /* + * Ignore insert records without new tuples (this does happen when + * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + if (xlrec->target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + + if (ctx->callbacks.filter_by_origin_cb != NULL && + filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r))) + { + return; + } + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->target_node, + sizeof(RelFileNode)); + + datalen = XLogRecGetDataLen(r); + tupledata = XLogRecGetData(r) + SizeofColumnarInsert; + tuplelen = datalen - SizeofColumnarInsert; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + ReorderBufferTupleBuf *newtuple = change->data.tp.newtuple; + newtuple->tuple.t_len = tuplelen + SizeofHeapTupleHeader; + HeapTupleHeader header = newtuple->tuple.t_data; + ItemPointerSetInvalid(&newtuple->tuple.t_self); + newtuple->tuple.t_tableOid = InvalidOid; + memset(header, 0, SizeofHeapTupleHeader); + memcpy((char *) newtuple->tuple.t_data + SizeofHeapTupleHeader, + tupledata, tuplelen); + + header->t_infomask = xlrec->t_infomask; + header->t_infomask2 = xlrec->t_infomask2; + header->t_hoff = xlrec->t_hoff; + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); +} + + diff --git a/src/backend/columnar/mod.c b/src/backend/columnar/mod.c index c5112a5f4..8d9655188 100644 --- a/src/backend/columnar/mod.c +++ b/src/backend/columnar/mod.c @@ -19,6 +19,7 @@ #include "columnar/columnar.h" #include "columnar/columnar_tableam.h" +#include "columnar/columnar_wal.h" PG_MODULE_MAGIC; diff --git a/src/include/columnar/columnar_wal.h b/src/include/columnar/columnar_wal.h new file mode 100644 index 000000000..8c4ebe00e --- /dev/null +++ b/src/include/columnar/columnar_wal.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * columnar_wal.h + * + * Type and function declarations for Columnar + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef COLUMNAR_WAL_H +#define COLUMNAR_WAL_H + +extern void columnar_wal_init(void); +extern void columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, + char *buf, uint32 len); +extern void columnar_wal_page_append(Relation rel, BlockNumber blockno, + uint32 offset, char *buf, uint32 len); +extern void columnar_wal_insert(Relation rel, TupleTableSlot *slot, + int options); + +#endif /* COLUMNAR_WAL_H */