mirror of https://github.com/citusdata/citus.git
works
parent
0ce172b209
commit
f7c8f36836
|
@ -57,6 +57,7 @@ static const struct config_enum_entry columnar_compression_options[] =
|
||||||
void
|
void
|
||||||
columnar_init(void)
|
columnar_init(void)
|
||||||
{
|
{
|
||||||
|
columnar_wal_init();
|
||||||
columnar_init_gucs();
|
columnar_init_gucs();
|
||||||
columnar_tableam_init();
|
columnar_tableam_init();
|
||||||
}
|
}
|
||||||
|
|
|
@ -755,10 +755,11 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
Datum *values = detoast_values(slot->tts_tupleDescriptor,
|
Datum *values = detoast_values(slot->tts_tupleDescriptor,
|
||||||
slot->tts_values, slot->tts_isnull);
|
slot->tts_values, slot->tts_isnull);
|
||||||
|
|
||||||
columnar_wal_insert(relation, slot, options);
|
|
||||||
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
||||||
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
||||||
|
|
||||||
|
columnar_wal_insert(relation, slot, options);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ typedef struct xl_columnar_insert
|
||||||
uint8 t_hoff;
|
uint8 t_hoff;
|
||||||
} xl_columnar_insert;
|
} xl_columnar_insert;
|
||||||
|
|
||||||
#define SizeOfColumnarInsert (offsetof(xl_columnar_insert, flags) + sizeof(uint8))
|
#define SizeofColumnarInsert (offsetof(xl_columnar_insert, t_hoff) + sizeof(uint8))
|
||||||
|
|
||||||
static void columnar_redo_page_overwrite(XLogReaderState *record);
|
static void columnar_redo_page_overwrite(XLogReaderState *record);
|
||||||
static void columnar_redo_page_append(XLogReaderState *record);
|
static void columnar_redo_page_append(XLogReaderState *record);
|
||||||
|
@ -75,6 +75,7 @@ static RmgrData ColumnarRmgr = {
|
||||||
void
|
void
|
||||||
columnar_wal_init()
|
columnar_wal_init()
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_wal_init");
|
||||||
RegisterCustomRmgr(RM_COLUMNAR_ID, &ColumnarRmgr);
|
RegisterCustomRmgr(RM_COLUMNAR_ID, &ColumnarRmgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +86,7 @@ columnar_wal_page_overwrite(Relation rel, BlockNumber blockno, char *buf,
|
||||||
XLogRecPtr recptr;
|
XLogRecPtr recptr;
|
||||||
Buffer buffer = ReadBuffer(rel, blockno);
|
Buffer buffer = ReadBuffer(rel, blockno);
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_page_overwrite");
|
||||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||||
|
|
||||||
START_CRIT_SECTION();
|
START_CRIT_SECTION();
|
||||||
|
@ -120,6 +122,7 @@ columnar_wal_page_append(Relation rel, BlockNumber blockno, uint32 offset,
|
||||||
XLogRecPtr recptr;
|
XLogRecPtr recptr;
|
||||||
Buffer buffer = ReadBuffer(rel, blockno);
|
Buffer buffer = ReadBuffer(rel, blockno);
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_page_append");
|
||||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||||
|
|
||||||
START_CRIT_SECTION();
|
START_CRIT_SECTION();
|
||||||
|
@ -170,6 +173,7 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elog(LOG, "columnar_wal_insert");
|
||||||
memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode));
|
memcpy(&xlrec.target_node, &rel->rd_node, sizeof(RelFileNode));
|
||||||
|
|
||||||
xlrec.flags = 0;
|
xlrec.flags = 0;
|
||||||
|
@ -191,7 +195,7 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
||||||
|
|
||||||
XLogBeginInsert();
|
XLogBeginInsert();
|
||||||
|
|
||||||
XLogRegisterData((char *) &xlrec, SizeOfColumnarInsert);
|
XLogRegisterData((char *) &xlrec, SizeofColumnarInsert);
|
||||||
XLogRegisterData((char *) heaptup->t_data + SizeofHeapTupleHeader,
|
XLogRegisterData((char *) heaptup->t_data + SizeofHeapTupleHeader,
|
||||||
heaptup->t_len - SizeofHeapTupleHeader);
|
heaptup->t_len - SizeofHeapTupleHeader);
|
||||||
|
|
||||||
|
@ -212,6 +216,7 @@ columnar_wal_insert(Relation rel, TupleTableSlot *slot, int options)
|
||||||
static void
|
static void
|
||||||
columnar_redo_page_overwrite(XLogReaderState *record)
|
columnar_redo_page_overwrite(XLogReaderState *record)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_redo_page_overwrite");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,6 +226,7 @@ columnar_redo_page_overwrite(XLogReaderState *record)
|
||||||
static void
|
static void
|
||||||
columnar_redo_page_append(XLogReaderState *record)
|
columnar_redo_page_append(XLogReaderState *record)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_redo_page_append");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,6 +237,7 @@ columnar_redo_page_append(XLogReaderState *record)
|
||||||
static void
|
static void
|
||||||
columnar_redo_insert(XLogReaderState *record)
|
columnar_redo_insert(XLogReaderState *record)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_redo_insert");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,6 +246,7 @@ columnar_redo(XLogReaderState *record)
|
||||||
{
|
{
|
||||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
|
||||||
|
|
||||||
|
elog(LOG, "columnar_redo");
|
||||||
switch (info)
|
switch (info)
|
||||||
{
|
{
|
||||||
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
case XLOG_COLUMNAR_PAGE_OVERWRITE:
|
||||||
|
@ -258,12 +266,13 @@ columnar_redo(XLogReaderState *record)
|
||||||
static void
|
static void
|
||||||
columnar_desc(StringInfo buf, XLogReaderState *record)
|
columnar_desc(StringInfo buf, XLogReaderState *record)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_redo");
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *
|
static const char *
|
||||||
columnar_identify(uint8 info)
|
columnar_identify(uint8 info)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_identify");
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,6 +291,7 @@ columnar_cleanup(void)
|
||||||
static void
|
static void
|
||||||
columnar_mask(char *pagedata, BlockNumber blkno)
|
columnar_mask(char *pagedata, BlockNumber blkno)
|
||||||
{
|
{
|
||||||
|
elog(LOG, "columnar_mask");
|
||||||
mask_page_lsn_and_checksum(pagedata);
|
mask_page_lsn_and_checksum(pagedata);
|
||||||
|
|
||||||
mask_unused_space(pagedata);
|
mask_unused_space(pagedata);
|
||||||
|
@ -298,6 +308,7 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
xl_columnar_insert *xlrec;
|
xl_columnar_insert *xlrec;
|
||||||
ReorderBufferChange *change;
|
ReorderBufferChange *change;
|
||||||
|
|
||||||
|
elog(LOG, "columnar_decode");
|
||||||
xlrec = (xl_columnar_insert *) XLogRecGetData(r);
|
xlrec = (xl_columnar_insert *) XLogRecGetData(r);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -314,7 +325,7 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
/* output plugin doesn't look for this origin, no need to queue */
|
/* output plugin doesn't look for this origin, no need to queue */
|
||||||
|
|
||||||
if (ctx->callbacks.filter_by_origin_cb != NULL &&
|
if (ctx->callbacks.filter_by_origin_cb != NULL &&
|
||||||
!filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r)))
|
filter_by_origin_cb_wrapper(ctx, XLogRecGetOrigin(r)))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -330,8 +341,8 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
sizeof(RelFileNode));
|
sizeof(RelFileNode));
|
||||||
|
|
||||||
datalen = XLogRecGetDataLen(r);
|
datalen = XLogRecGetDataLen(r);
|
||||||
tupledata = XLogRecGetData(r) + SizeOfColumnarInsert;
|
tupledata = XLogRecGetData(r) + SizeofColumnarInsert;
|
||||||
tuplelen = datalen - SizeOfColumnarInsert;
|
tuplelen = datalen - SizeofColumnarInsert;
|
||||||
|
|
||||||
change->data.tp.newtuple =
|
change->data.tp.newtuple =
|
||||||
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
|
||||||
|
@ -342,7 +353,7 @@ columnar_decode(struct LogicalDecodingContext *ctx,
|
||||||
ItemPointerSetInvalid(&newtuple->tuple.t_self);
|
ItemPointerSetInvalid(&newtuple->tuple.t_self);
|
||||||
newtuple->tuple.t_tableOid = InvalidOid;
|
newtuple->tuple.t_tableOid = InvalidOid;
|
||||||
memset(header, 0, SizeofHeapTupleHeader);
|
memset(header, 0, SizeofHeapTupleHeader);
|
||||||
memcpy(newtuple->tuple.t_data + SizeofHeapTupleHeader,
|
memcpy((char *) newtuple->tuple.t_data + SizeofHeapTupleHeader,
|
||||||
tupledata, tuplelen);
|
tupledata, tuplelen);
|
||||||
|
|
||||||
header->t_infomask = xlrec->t_infomask;
|
header->t_infomask = xlrec->t_infomask;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_tableam.h"
|
#include "columnar/columnar_tableam.h"
|
||||||
|
#include "columnar/columnar_wal.h"
|
||||||
|
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
Loading…
Reference in New Issue