From 0ce172b209371b7b821b48c4838b92923f7553bf Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Thu, 10 Feb 2022 17:48:52 -0800 Subject: [PATCH] WIP --- src/backend/columnar/columnar_tableam.c | 1 + src/backend/columnar/columnar_wal.c | 105 ++++++++++++++++++++++-- src/include/columnar/columnar_wal.h | 5 +- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 64afe02a7..7a124efd1 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -58,6 +58,7 @@ #include "columnar/columnar_storage.h" #include "columnar/columnar_tableam.h" #include "columnar/columnar_version_compat.h" +#include "columnar/columnar_wal.h" #include "distributed/listutils.h" /* diff --git a/src/backend/columnar/columnar_wal.c b/src/backend/columnar/columnar_wal.c index 7fb4e5aa4..b41aed82a 100644 --- a/src/backend/columnar/columnar_wal.c +++ b/src/backend/columnar/columnar_wal.c @@ -9,14 +9,36 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/heapam.h" +#include "access/heapam_xlog.h" +#include "access/htup_details.h" #include "access/rmgr.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" +#include "columnar/columnar_wal.h" +#include "executor/tuptable.h" +#include "miscadmin.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/reorderbuffer.h" +#include "safe_lib.h" +#include "storage/bufmgr.h" +#include "storage/relfilenode.h" +#include "utils/relcache.h" #define RM_COLUMNAR_ID 241 #define RM_COLUMNAR_NAME "columnar" +#define XLOG_COLUMNAR_PAGE_OVERWRITE 0 +#define XLOG_COLUMNAR_PAGE_APPEND 1 +#define XLOG_COLUMNAR_INSERT 2 + typedef struct xl_columnar_insert { + RelFileNode target_node; OffsetNumber offnum; /* inserted tuple's offset */ uint16 t_infomask2; uint16 t_infomask; @@ -26,7 +48,8 @@ typedef struct xl_columnar_insert #define SizeOfColumnarInsert (offsetof(xl_columnar_insert, flags) + sizeof(uint8)) -static void columnar_redo_write(XLogReaderState *record); +static void columnar_redo_page_overwrite(XLogReaderState *record); +static void columnar_redo_page_append(XLogReaderState *record); static void columnar_redo_insert(XLogReaderState *record); static void columnar_redo(XLogReaderState *record); @@ -74,8 +97,8 @@ columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf, if (len > phdr->pd_upper - phdr->pd_lower) { elog(ERROR, - "write of columnar data of length %d to offset %d exceeds free space of block %d relation %d", - len, offset, blockno, rel->rd_id); + "write of columnar data of length %d exceeds page size on block %d of relation %d", + len, blockno, rel->rd_id); } memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); @@ -147,6 +170,8 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) return; } + memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode)); + xlrec.flags = 0; xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; @@ -155,7 +180,7 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; } - if (IsToastRelation(relation)) + if (IsToastRelation(rel)) { xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } @@ -173,11 +198,11 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); /* not associated with any buffer, so ignore returned LSN */ - (void) XLogInsert(RM_HEAP_ID, info); + (void) XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_INSERT); if (shouldFree) { - pfree(tuple); + pfree(heaptup); } } @@ -239,7 +264,7 @@ columnar_desc(StringInfo buf, XLogReaderState *record) static const char * columnar_identify(uint8 info) { - + return ""; } static void @@ -257,16 +282,78 @@ columnar_cleanup(void) static void columnar_mask(char *pagedata, BlockNumber blkno) { - mask_page_lsn_and_checksum(page); + mask_page_lsn_and_checksum(pagedata); - mask_unused_space(page); + mask_unused_space(pagedata); } static void columnar_decode(struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf) { + Size datalen; + char *tupledata; + Size tuplelen; + XLogReaderState *r = buf->record; + xl_columnar_insert *xlrec; + ReorderBufferChange *change; + xlrec = (xl_columnar_insert *) XLogRecGetData(r); + + /* + * Ignore insert records without new tuples (this does happen when + * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + if (xlrec->target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + + if (ctx->callbacks.filter_by_origin_cb != NULL && + !filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r))) + { + return; + } + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->target_node, + sizeof(RelFileNode)); + + datalen = XLogRecGetDataLen(r); + tupledata = XLogRecGetData(r) + SizeOfColumnarInsert; + tuplelen = datalen - SizeOfColumnarInsert; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + ReorderBufferTupleBuf *newtuple = change->data.tp.newtuple; + newtuple->tuple.t_len = tuplelen + SizeofHeapTupleHeader; + HeapTupleHeader header = newtuple->tuple.t_data; + ItemPointerSetInvalid(&newtuple->tuple.t_self); + newtuple->tuple.t_tableOid = InvalidOid; + memset(header, 0, SizeofHeapTupleHeader); + memcpy(newtuple->tuple.t_data + SizeofHeapTupleHeader, + tupledata, tuplelen); + + header->t_infomask = xlrec->t_infomask; + header->t_infomask2 = xlrec->t_infomask2; + header->t_hoff = xlrec->t_hoff; + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } diff --git a/src/include/columnar/columnar_wal.h b/src/include/columnar/columnar_wal.h index 8e8375990..8c4ebe00e 100644 --- a/src/include/columnar/columnar_wal.h +++ b/src/include/columnar/columnar_wal.h @@ -12,11 +12,12 @@ #ifndef COLUMNAR_WAL_H #define COLUMNAR_WAL_H -extern void columnar_wal_init(); +extern void columnar_wal_init(void); extern void columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf, uint32 len); extern void columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, char *buf, uint32 len); -extern void columnar_wal_insert(); +extern void columnar_wal_insert(Relation rel, TupleTableSlot *slot, + int options); #endif /* COLUMNAR_WAL_H */