mirror of https://github.com/citusdata/citus.git
use custom WAL
parent
aef5737421
commit
95762054a5
|
@ -48,6 +48,7 @@
|
|||
|
||||
#include "columnar/columnar.h"
|
||||
#include "columnar/columnar_storage.h"
|
||||
#include "columnar/columnar_wal.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -697,6 +698,16 @@ static void
|
|||
WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
|
||||
uint32 len, bool clear)
|
||||
{
|
||||
if (clear)
|
||||
{
|
||||
columnar_wal_page_overwrite(rel, blockno, buf, len);
|
||||
}
|
||||
else
|
||||
{
|
||||
columnar_wal_page_append(rel, blockno, offset, buf, len);
|
||||
}
|
||||
|
||||
#ifdef NOT_USED
|
||||
Buffer buffer = ReadBuffer(rel, blockno);
|
||||
GenericXLogState *state = GenericXLogStart(rel);
|
||||
|
||||
|
@ -742,6 +753,7 @@ WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
|
|||
GenericXLogFinish(state);
|
||||
|
||||
UnlockReleaseBuffer(buffer);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "access/rmgr.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "columnar/columnar_wal.h"
|
||||
#include "executor/tuptable.h"
|
||||
|
@ -30,12 +31,14 @@
|
|||
#include "storage/relfilenode.h"
|
||||
#include "utils/relcache.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#define RM_COLUMNAR_ID 241
|
||||
#define RM_COLUMNAR_NAME "columnar"
|
||||
|
||||
#define XLOG_COLUMNAR_PAGE_OVERWRITE 0
|
||||
#define XLOG_COLUMNAR_PAGE_APPEND 1
|
||||
#define XLOG_COLUMNAR_INSERT 2
|
||||
#define XLOG_COLUMNAR_PAGE_OVERWRITE 0x00
|
||||
#define XLOG_COLUMNAR_PAGE_APPEND 0x10
|
||||
#define XLOG_COLUMNAR_INSERT 0x20
|
||||
|
||||
typedef struct xl_columnar_insert
|
||||
{
|
||||
|
@ -87,7 +90,7 @@ columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf,
|
|||
XLogRecPtr recptr;
|
||||
Buffer buffer = ReadBuffer(rel, blockno);
|
||||
|
||||
elog(LOG, "columnar_wal_page_overwrite");
|
||||
elog(LOG, "columnar_wal_page_overwrite blockno %d len %d", blockno, len);
|
||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
@ -123,7 +126,8 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
|||
XLogRecPtr recptr;
|
||||
Buffer buffer = ReadBuffer(rel, blockno);
|
||||
|
||||
elog(LOG, "columnar_wal_page_append");
|
||||
elog(LOG, "columnar_wal_page_append: blockno %d offset %d len %d",
|
||||
blockno, offset, len);
|
||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
@ -133,6 +137,7 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
|||
PageHeader phdr = (PageHeader) page;
|
||||
if (PageIsNew(page))
|
||||
{
|
||||
elog(LOG, "initializing page %d", blockno);
|
||||
PageInit(page, BLCKSZ, 0);
|
||||
}
|
||||
|
||||
|
@ -153,6 +158,7 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
|||
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
|
||||
phdr->pd_lower += len;
|
||||
|
||||
MarkBufferDirty(buffer);
|
||||
XLogBeginInsert();
|
||||
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | REGBUF_FORCE_IMAGE);
|
||||
recptr = XLogInsert(RM_COLUMNAR_ID, XLOG_COLUMNAR_PAGE_APPEND);
|
||||
|
@ -165,16 +171,18 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
|||
void
|
||||
columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
||||
{
|
||||
bool shouldFree = true;
|
||||
HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
||||
xl_columnar_insert xlrec;
|
||||
xl_columnar_insert xlrec;
|
||||
bool shouldFree = true;
|
||||
|
||||
if (options & HEAP_INSERT_NO_LOGICAL)
|
||||
if (!RelationIsLogicallyLogged(rel) || options & HEAP_INSERT_NO_LOGICAL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
elog(LOG, "columnar_wal_insert");
|
||||
|
||||
HeapTuple heaptup = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
||||
|
||||
memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode));
|
||||
|
||||
xlrec.flags = 0;
|
||||
|
@ -211,23 +219,39 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Full page images are always forced, so this is a no-op.
|
||||
*/
|
||||
static void
|
||||
columnar_redo_page_overwrite(XLogReaderState *record)
|
||||
{
|
||||
Buffer buffer = InvalidBuffer;
|
||||
|
||||
elog(LOG, "columnar_redo_page_overwrite");
|
||||
|
||||
XLogRedoAction action = XLogReadBufferForRedo(record, 0, &buffer);
|
||||
|
||||
/* full page image is forced, no redo necessary */
|
||||
Assert(action != BLK_NEEDS_REDO);
|
||||
|
||||
if (BufferIsValid(buffer))
|
||||
UnlockReleaseBuffer(buffer);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Full page images are always forced, so this is a no-op.
|
||||
*/
|
||||
static void
|
||||
columnar_redo_page_append(XLogReaderState *record)
|
||||
{
|
||||
Buffer buffer = InvalidBuffer;
|
||||
|
||||
elog(LOG, "columnar_redo_page_append");
|
||||
|
||||
XLogRedoAction action = XLogReadBufferForRedo(record, 0, &buffer);
|
||||
|
||||
/* full page image is forced, no redo necessary */
|
||||
Assert(action != BLK_NEEDS_REDO);
|
||||
|
||||
if (BufferIsValid(buffer))
|
||||
UnlockReleaseBuffer(buffer);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -245,7 +269,7 @@ columnar_redo_insert(XLogReaderState *record)
|
|||
static void
|
||||
columnar_redo(XLogReaderState *record)
|
||||
{
|
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||
|
||||
elog(LOG, "columnar_redo");
|
||||
switch (info)
|
||||
|
@ -267,20 +291,59 @@ columnar_redo(XLogReaderState *record)
|
|||
static void
|
||||
columnar_desc(StringInfo buf, XLogReaderState *record)
|
||||
{
|
||||
elog(LOG, "columnar_redo");
|
||||
char *rec = XLogRecGetData(record);
|
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
||||
{
|
||||
appendStringInfo(buf, "columnar desc overwrite");
|
||||
}
|
||||
break;
|
||||
case XLOG_COLUMNAR_PAGE_APPEND:
|
||||
{
|
||||
appendStringInfo(buf, "columnar desc append");
|
||||
}
|
||||
break;
|
||||
case XLOG_COLUMNAR_INSERT:
|
||||
{
|
||||
xl_columnar_insert xlrec;
|
||||
|
||||
memcpy(&xlrec, rec, SizeofColumnarInsert);
|
||||
appendStringInfo(buf, "columnar insert to relid: %d",
|
||||
xlrec.target_node.relNode);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static const char *
|
||||
columnar_identify(uint8 info)
|
||||
{
|
||||
elog(LOG, "columnar_identify");
|
||||
return "";
|
||||
const char *id = NULL;
|
||||
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
||||
id = "XLOG_COLUMNAR_PAGE_OVERWRITE";
|
||||
break;
|
||||
case XLOG_COLUMNAR_PAGE_APPEND:
|
||||
id = "XLOG_COLUMNAR_PAGE_APPEND";
|
||||
break;
|
||||
case XLOG_COLUMNAR_INSERT:
|
||||
id = "XLOG_COLUMNAR_INSERT";
|
||||
break;
|
||||
}
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
static void
|
||||
columnar_startup(void)
|
||||
{
|
||||
elog(LOG, "columnar_rm_startup()");
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -299,8 +362,22 @@ columnar_mask(char *pagedata, BlockNumber blkno)
|
|||
}
|
||||
|
||||
static void
|
||||
columnar_decode(struct LogicalDecodingContext *ctx,
|
||||
struct XLogRecordBuffer *buf)
|
||||
columnar_decode_page_overwrite(struct LogicalDecodingContext *ctx,
|
||||
struct XLogRecordBuffer *buf)
|
||||
{
|
||||
elog(LOG, "columnar_decode_page_overwrite");
|
||||
}
|
||||
|
||||
static void
|
||||
columnar_decode_page_append(struct LogicalDecodingContext *ctx,
|
||||
struct XLogRecordBuffer *buf)
|
||||
{
|
||||
elog(LOG, "columnar_decode_page_append");
|
||||
}
|
||||
|
||||
static void
|
||||
columnar_decode_insert(struct LogicalDecodingContext *ctx,
|
||||
struct XLogRecordBuffer *buf)
|
||||
{
|
||||
Size datalen;
|
||||
char *tupledata;
|
||||
|
@ -309,7 +386,7 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
|||
xl_columnar_insert *xlrec;
|
||||
ReorderBufferChange *change;
|
||||
|
||||
elog(LOG, "columnar_decode");
|
||||
elog(LOG, "columnar_decode_insert");
|
||||
xlrec = (xl_columnar_insert *) XLogRecGetData(r);
|
||||
|
||||
/*
|
||||
|
@ -368,4 +445,27 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
|||
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
|
||||
}
|
||||
|
||||
static void
|
||||
columnar_decode(struct LogicalDecodingContext *ctx,
|
||||
struct XLogRecordBuffer *buf)
|
||||
{
|
||||
XLogReaderState *record = buf->record;
|
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||
|
||||
elog(LOG, "columnar_decode");
|
||||
switch (info)
|
||||
{
|
||||
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
||||
columnar_decode_page_overwrite(ctx, buf);
|
||||
break;
|
||||
case XLOG_COLUMNAR_PAGE_APPEND:
|
||||
columnar_decode_page_append(ctx, buf);
|
||||
break;
|
||||
case XLOG_COLUMNAR_INSERT:
|
||||
columnar_decode_insert(ctx, buf);
|
||||
break;
|
||||
default:
|
||||
elog(PANIC, "columnar_decode: unknown op code %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue