diff --git a/src/backend/columnar/columnar_storage.c b/src/backend/columnar/columnar_storage.c index 9712e7160..0482670ec 100644 --- a/src/backend/columnar/columnar_storage.c +++ b/src/backend/columnar/columnar_storage.c @@ -48,6 +48,7 @@ #include "columnar/columnar.h" #include "columnar/columnar_storage.h" +#include "columnar/columnar_wal.h" /* @@ -697,6 +698,16 @@ static void WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf, uint32 len, bool clear) { + if (clear) + { + columnar_wal_page_overwrite(rel, blockno, buf, len); + } + else + { + columnar_wal_page_append(rel, blockno, offset, buf, len); + } + +#ifdef NOT_USED Buffer buffer = ReadBuffer(rel, blockno); GenericXLogState *state = GenericXLogStart(rel); @@ -742,6 +753,7 @@ WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf, GenericXLogFinish(state); UnlockReleaseBuffer(buffer); +#endif } diff --git a/src/backend/columnar/columnar_wal.c b/src/backend/columnar/columnar_wal.c index d31398490..7f4756372 100644 --- a/src/backend/columnar/columnar_wal.c +++ b/src/backend/columnar/columnar_wal.c @@ -18,6 +18,7 @@ #include "access/rmgr.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" +#include "access/xlogutils.h" #include "catalog/catalog.h" #include "columnar/columnar_wal.h" #include "executor/tuptable.h" @@ -30,12 +31,14 @@ #include "storage/relfilenode.h" #include "utils/relcache.h" +#include + #define RM_COLUMNAR_ID 241 #define RM_COLUMNAR_NAME "columnar" -#define XLOG_COLUMNAR_PAGE_OVERWRITE 0 -#define XLOG_COLUMNAR_PAGE_APPEND 1 -#define XLOG_COLUMNAR_INSERT 2 +#define XLOG_COLUMNAR_PAGE_OVERWRITE 0x00 +#define XLOG_COLUMNAR_PAGE_APPEND 0x10 +#define XLOG_COLUMNAR_INSERT 0x20 typedef struct xl_columnar_insert { @@ -87,7 +90,7 @@ columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf, XLogRecPtr recptr; Buffer buffer = ReadBuffer(rel, blockno); - elog(LOG, "columnar_wal_page_overwrite"); + elog(LOG, "columnar_wal_page_overwrite blockno %d len %d", blockno, len); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); START_CRIT_SECTION(); @@ -123,7 +126,8 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, XLogRecPtr recptr; Buffer buffer = ReadBuffer(rel, blockno); - elog(LOG, "columnar_wal_page_append"); + elog(LOG, "columnar_wal_page_append: blockno %d offset %d len %d", + blockno, offset, len); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); START_CRIT_SECTION(); @@ -133,6 +137,7 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, PageHeader phdr = (PageHeader) page; if (PageIsNew(page)) { + elog(LOG, "initializing page %d", blockno); PageInit(page, BLCKSZ, 0); } @@ -153,6 +158,7 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); phdr->pd_lower += len; + MarkBufferDirty(buffer); XLogBeginInsert(); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE); recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_APPEND); @@ -165,16 +171,18 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset, void columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) { - bool shouldFree = true; - HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree); - xl_columnar_insert xlrec; + xl_columnar_insert xlrec; + bool shouldFree = true; - if (options & HEAP_INSERT_NO_LOGICAL) + if (!RelationIsLogicallyLogged(rel) || options & HEAP_INSERT_NO_LOGICAL) { return; } elog(LOG, "columnar_wal_insert"); + + HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode)); xlrec.flags = 0; @@ -211,23 +219,39 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options) } } -/* - * Full page images are always forced, so this is a no-op. - */ static void columnar_redo_page_overwrite(XLogReaderState *record) { + Buffer buffer = InvalidBuffer; + elog(LOG, "columnar_redo_page_overwrite"); + + XLogRedoAction action = XLogReadBufferForRedo(record, 0, &buffer); + + /* full page image is forced, no redo necessary */ + Assert(action != BLK_NEEDS_REDO); + + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + return; } -/* - * Full page images are always forced, so this is a no-op. - */ static void columnar_redo_page_append(XLogReaderState *record) { + Buffer buffer = InvalidBuffer; + elog(LOG, "columnar_redo_page_append"); + + XLogRedoAction action = XLogReadBufferForRedo(record, 0, &buffer); + + /* full page image is forced, no redo necessary */ + Assert(action != BLK_NEEDS_REDO); + + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + return; } @@ -245,7 +269,7 @@ columnar_redo_insert(XLogReaderState *record) static void columnar_redo(XLogReaderState *record) { - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; elog(LOG, "columnar_redo"); switch (info) @@ -267,20 +291,59 @@ columnar_redo(XLogReaderState *record) static void columnar_desc(StringInfo buf, XLogReaderState *record) { - elog(LOG, "columnar_redo"); + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_COLUMNAR_PAGE_OVERWRITE: + { + appendStringInfo(buf, "columnar desc overwrite"); + } + break; + case XLOG_COLUMNAR_PAGE_APPEND: + { + appendStringInfo(buf, "columnar desc append"); + } + break; + case XLOG_COLUMNAR_INSERT: + { + xl_columnar_insert xlrec; + + memcpy(&xlrec, rec, SizeofColumnarInsert); + appendStringInfo(buf, "columnar insert to relid: %d", + xlrec.target_node.relNode); + } + break; + } } static const char * columnar_identify(uint8 info) { - elog(LOG, "columnar_identify"); - return ""; + const char *id = NULL; + + switch (info) + { + case XLOG_COLUMNAR_PAGE_OVERWRITE: + id = "XLOG_COLUMNAR_PAGE_OVERWRITE"; + break; + case XLOG_COLUMNAR_PAGE_APPEND: + id = "XLOG_COLUMNAR_PAGE_APPEND"; + break; + case XLOG_COLUMNAR_INSERT: + id = "XLOG_COLUMNAR_INSERT"; + break; + } + + return id; } static void columnar_startup(void) { elog(LOG, "columnar_rm_startup()"); + sleep(10); } static void @@ -299,8 +362,22 @@ columnar_mask(char *pagedata, BlockNumber blkno) } static void -columnar_decode(struct LogicalDecodingContext *ctx, - struct XLogRecordBuffer *buf) +columnar_decode_page_overwrite(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + elog(LOG, "columnar_decode_page_overwrite"); +} + +static void +columnar_decode_page_append(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + elog(LOG, "columnar_decode_page_append"); +} + +static void +columnar_decode_insert(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) { Size datalen; char *tupledata; @@ -309,7 +386,7 @@ columnar_decode(struct LogicalDecodingContext *ctx, xl_columnar_insert *xlrec; ReorderBufferChange *change; - elog(LOG, "columnar_decode"); + elog(LOG, "columnar_decode_insert"); xlrec = (xl_columnar_insert *) XLogRecGetData(r); /* @@ -368,4 +445,27 @@ columnar_decode(struct LogicalDecodingContext *ctx, xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } +static void +columnar_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + XLogReaderState *record = buf->record; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + elog(LOG, "columnar_decode"); + switch (info) + { + case XLOG_COLUMNAR_PAGE_OVERWRITE: + columnar_decode_page_overwrite(ctx, buf); + break; + case XLOG_COLUMNAR_PAGE_APPEND: + columnar_decode_page_append(ctx, buf); + break; + case XLOG_COLUMNAR_INSERT: + columnar_decode_insert(ctx, buf); + break; + default: + elog(PANIC, "columnar_decode: unknown op code %u", info); + } +}