DBZ-1052 Emit tx BEGIN/END messages
parent
01126bfa89
commit
fb37f79073
|
@ -8,6 +8,8 @@ enum Op {
|
|||
INSERT = 0;
|
||||
UPDATE = 1;
|
||||
DELETE = 2;
|
||||
BEGIN = 3;
|
||||
COMMIT = 4;
|
||||
}
|
||||
|
||||
message Point {
|
||||
|
|
|
@ -164,16 +164,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
|
|||
MemoryContextDelete(data->context);
|
||||
}
|
||||
|
||||
/* BEGIN callback */
|
||||
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn) {
|
||||
}
|
||||
|
||||
/* COMMIT callback */
|
||||
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
||||
}
|
||||
|
||||
/* print tuple datums (only used for debug-mode) */
|
||||
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
||||
size_t n) {
|
||||
|
@ -491,6 +481,82 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
|
|||
}
|
||||
}
|
||||
|
||||
/* BEGIN callback */
|
||||
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn) {
|
||||
|
||||
DecoderData *data;
|
||||
MemoryContext old;
|
||||
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||
elog(DEBUG1, "Entering begin callback");
|
||||
|
||||
|
||||
/* Avoid leaking memory by using and resetting our own context */
|
||||
data = ctx->output_plugin_private;
|
||||
old = MemoryContextSwitchTo(data->context);
|
||||
|
||||
rmsg.op = DECODERBUFS__OP__BEGIN;
|
||||
rmsg.has_op = true;
|
||||
rmsg.transaction_id = txn->xid;
|
||||
rmsg.has_transaction_id = true;
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
rmsg.has_commit_time = true;
|
||||
|
||||
/* write msg */
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
if (data->debug_mode) {
|
||||
print_row_msg(ctx->out, &rmsg);
|
||||
} else {
|
||||
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||
void *packed = palloc(psize);
|
||||
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||
}
|
||||
OutputPluginWrite(ctx, true);
|
||||
|
||||
/* Cleanup, freeing memory */
|
||||
MemoryContextSwitchTo(old);
|
||||
MemoryContextReset(data->context);
|
||||
}
|
||||
|
||||
/* COMMIT callback */
|
||||
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
||||
|
||||
DecoderData *data;
|
||||
MemoryContext old;
|
||||
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||
elog(DEBUG1, "Entering commit callback");
|
||||
|
||||
|
||||
/* Avoid leaking memory by using and resetting our own context */
|
||||
data = ctx->output_plugin_private;
|
||||
old = MemoryContextSwitchTo(data->context);
|
||||
|
||||
rmsg.op = DECODERBUFS__OP__COMMIT;
|
||||
rmsg.has_op = true;
|
||||
rmsg.transaction_id = txn->xid;
|
||||
rmsg.has_transaction_id = true;
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
rmsg.has_commit_time = true;
|
||||
|
||||
/* write msg */
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
if (data->debug_mode) {
|
||||
print_row_msg(ctx->out, &rmsg);
|
||||
} else {
|
||||
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||
void *packed = palloc(psize);
|
||||
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||
}
|
||||
OutputPluginWrite(ctx, true);
|
||||
|
||||
/* Cleanup, freeing memory */
|
||||
MemoryContextSwitchTo(old);
|
||||
MemoryContextReset(data->context);
|
||||
}
|
||||
|
||||
/* callback for individual changed tuples */
|
||||
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change) {
|
||||
|
|
|
@ -565,17 +565,21 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
|||
(ProtobufCMessageInit) decoderbufs__row_message__init,
|
||||
NULL,NULL,NULL /* reserved[123] */
|
||||
};
|
||||
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
|
||||
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[5] =
|
||||
{
|
||||
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
|
||||
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
|
||||
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
|
||||
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
|
||||
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
|
||||
};
|
||||
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
|
||||
{0, 0},{0, 3}
|
||||
{0, 0},{0, 5}
|
||||
};
|
||||
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
|
||||
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[5] =
|
||||
{
|
||||
{ "BEGIN", 3 },
|
||||
{ "COMMIT", 4 },
|
||||
{ "DELETE", 2 },
|
||||
{ "INSERT", 0 },
|
||||
{ "UPDATE", 1 },
|
||||
|
@ -587,9 +591,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
|||
"Op",
|
||||
"Decoderbufs__Op",
|
||||
"decoderbufs",
|
||||
3,
|
||||
5,
|
||||
decoderbufs__op__enum_values_by_number,
|
||||
3,
|
||||
5,
|
||||
decoderbufs__op__enum_values_by_name,
|
||||
1,
|
||||
decoderbufs__op__value_ranges,
|
||||
|
|
|
@ -26,7 +26,9 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
|
|||
typedef enum _Decoderbufs__Op {
|
||||
DECODERBUFS__OP__INSERT = 0,
|
||||
DECODERBUFS__OP__UPDATE = 1,
|
||||
DECODERBUFS__OP__DELETE = 2
|
||||
DECODERBUFS__OP__DELETE = 2,
|
||||
DECODERBUFS__OP__BEGIN = 3,
|
||||
DECODERBUFS__OP__COMMIT = 4
|
||||
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
|
||||
} Decoderbufs__Op;
|
||||
|
||||
|
|
Loading…
Reference in New Issue