custom-rmgr-2
Jeff Davis 2022-02-10 17:48:52 -08:00
parent bd5a4da463
commit 0ce172b209
3 changed files with 100 additions and 11 deletions

View File

@ -58,6 +58,7 @@
#include "columnar/columnar_storage.h" #include "columnar/columnar_storage.h"
#include "columnar/columnar_tableam.h" #include "columnar/columnar_tableam.h"
#include "columnar/columnar_version_compat.h" #include "columnar/columnar_version_compat.h"
#include "columnar/columnar_wal.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
/* /*

View File

@ -9,14 +9,36 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h"
#include "access/bufmask.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/rmgr.h" #include "access/rmgr.h"
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "columnar/columnar_wal.h"
#include "executor/tuptable.h"
#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "safe_lib.h"
#include "storage/bufmgr.h"
#include "storage/relfilenode.h"
#include "utils/relcache.h"
#define RM_COLUMNAR_ID 241 #define RM_COLUMNAR_ID 241
#define RM_COLUMNAR_NAME "columnar" #define RM_COLUMNAR_NAME "columnar"
#define XLOG_COLUMNAR_PAGE_OVERWRITE 0
#define XLOG_COLUMNAR_PAGE_APPEND 1
#define XLOG_COLUMNAR_INSERT 2
typedef struct xl_columnar_insert typedef struct xl_columnar_insert
{ {
RelFileNode target_node;
OffsetNumber offnum; /* inserted tuple's offset */ OffsetNumber offnum; /* inserted tuple's offset */
uint16 t_infomask2; uint16 t_infomask2;
uint16 t_infomask; uint16 t_infomask;
@ -26,7 +48,8 @@ typedef struct xl_columnar_insert
#define SizeOfColumnarInsert (offsetof(xl_columnar_insert, flags) + sizeof(uint8)) #define SizeOfColumnarInsert (offsetof(xl_columnar_insert, flags) + sizeof(uint8))
static void columnar_redo_write(XLogReaderState *record); static void columnar_redo_page_overwrite(XLogReaderState *record);
static void columnar_redo_page_append(XLogReaderState *record);
static void columnar_redo_insert(XLogReaderState *record); static void columnar_redo_insert(XLogReaderState *record);
static void columnar_redo(XLogReaderState *record); static void columnar_redo(XLogReaderState *record);
@ -74,8 +97,8 @@ columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf,
if (len > phdr->pd_upper - phdr->pd_lower) if (len > phdr->pd_upper - phdr->pd_lower)
{ {
elog(ERROR, elog(ERROR,
"write of columnar data of length %d to offset %d exceeds free space of block %d relation %d", "write of columnar data of length %d exceeds page size on block %d of relation %d",
len, offset, blockno, rel->rd_id); len, blockno, rel->rd_id);
} }
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
@ -147,6 +170,8 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
return; return;
} }
memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode));
xlrec.flags = 0; xlrec.flags = 0;
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
@ -155,7 +180,7 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
} }
if (IsToastRelation(relation)) if (IsToastRelation(rel))
{ {
xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
} }
@ -173,11 +198,11 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
/* not associated with any buffer, so ignore returned LSN */ /* not associated with any buffer, so ignore returned LSN */
(void) XLogInsert(RM_HEAP_ID, info); (void) XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_INSERT);
if (shouldFree) if (shouldFree)
{ {
pfree(tuple); pfree(heaptup);
} }
} }
@ -239,7 +264,7 @@ columnar_desc(StringInfo buf, XLogReaderState *record)
static const char * static const char *
columnar_identify(uint8 info) columnar_identify(uint8 info)
{ {
return "";
} }
static void static void
@ -257,16 +282,78 @@ columnar_cleanup(void)
static void static void
columnar_mask(char *pagedata, BlockNumber blkno) columnar_mask(char *pagedata, BlockNumber blkno)
{ {
mask_page_lsn_and_checksum(page); mask_page_lsn_and_checksum(pagedata);
mask_unused_space(page); mask_unused_space(pagedata);
} }
static void static void
columnar_decode(struct LogicalDecodingContext *ctx, columnar_decode(struct LogicalDecodingContext *ctx,
struct XLogRecordBuffer *buf) struct XLogRecordBuffer *buf)
{ {
Size datalen;
char *tupledata;
Size tuplelen;
XLogReaderState *r = buf->record;
xl_columnar_insert *xlrec;
ReorderBufferChange *change;
xlrec = (xl_columnar_insert *) XLogRecGetData(r);
/*
* Ignore insert records without new tuples (this does happen when
* raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
*/
if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
return;
/* only interested in our database */
if (xlrec->target_node.dbNode != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (ctx->callbacks.filter_by_origin_cb != NULL &&
!filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r)))
{
return;
}
change = ReorderBufferGetChange(ctx->reorder);
if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
change->action = REORDER_BUFFER_CHANGE_INSERT;
else
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &xlrec->target_node,
sizeof(RelFileNode));
datalen = XLogRecGetDataLen(r);
tupledata = XLogRecGetData(r) + SizeOfColumnarInsert;
tuplelen = datalen - SizeOfColumnarInsert;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
ReorderBufferTupleBuf *newtuple = change->data.tp.newtuple;
newtuple->tuple.t_len = tuplelen + SizeofHeapTupleHeader;
HeapTupleHeader header = newtuple->tuple.t_data;
ItemPointerSetInvalid(&newtuple->tuple.t_self);
newtuple->tuple.t_tableOid = InvalidOid;
memset(header, 0, SizeofHeapTupleHeader);
memcpy(newtuple->tuple.t_data + SizeofHeapTupleHeader,
tupledata, tuplelen);
header->t_infomask = xlrec->t_infomask;
header->t_infomask2 = xlrec->t_infomask2;
header->t_hoff = xlrec->t_hoff;
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change,
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
} }

View File

@ -12,11 +12,12 @@
#ifndef COLUMNAR_WAL_H #ifndef COLUMNAR_WAL_H
#define COLUMNAR_WAL_H #define COLUMNAR_WAL_H
extern void columnar_wal_init(); extern void columnar_wal_init(void);
extern void columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, extern void columnar_wal_page_overwrite(Relation rel, BlockNumber blockno,
char *buf, uint32 len); char *buf, uint32 len);
extern void columnar_wal_page_append(Relation rel, BlockNumber blockno, extern void columnar_wal_page_append(Relation rel, BlockNumber blockno,
uint32 offset, char *buf, uint32 len); uint32 offset, char *buf, uint32 len);
extern void columnar_wal_insert(); extern void columnar_wal_insert(Relation rel, TupleTableSlot *slot,
int options);
#endif /* COLUMNAR_WAL_H */ #endif /* COLUMNAR_WAL_H */