From fb37f79073ccf50e62038596d2662d16e2450420 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 23 Jan 2020 13:26:38 +0100 Subject: [PATCH] DBZ-1052 Emit tx BEGIN/END messages --- proto/pg_logicaldec.proto | 2 + src/decoderbufs.c | 86 ++++++++++++++++++++++++++++++---- src/proto/pg_logicaldec.pb-c.c | 14 ++++-- src/proto/pg_logicaldec.pb-c.h | 4 +- 4 files changed, 90 insertions(+), 16 deletions(-) diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index 2f4a861..14aed7d 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -8,6 +8,8 @@ enum Op { INSERT = 0; UPDATE = 1; DELETE = 2; + BEGIN = 3; + COMMIT = 4; } message Point { diff --git a/src/decoderbufs.c b/src/decoderbufs.c index cf3308a..b22ebcb 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -164,16 +164,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { MemoryContextDelete(data->context); } -/* BEGIN callback */ -static void pg_decode_begin_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn) { -} - -/* COMMIT callback */ -static void pg_decode_commit_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { -} - /* print tuple datums (only used for debug-mode) */ static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { @@ -491,6 +481,82 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg, } } +/* BEGIN callback */ +static void pg_decode_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) { + + DecoderData *data; + MemoryContext old; + Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; + elog(DEBUG1, "Entering begin callback"); + + + /* Avoid leaking memory by using and resetting our own context */ + data = ctx->output_plugin_private; + old = MemoryContextSwitchTo(data->context); + + rmsg.op = DECODERBUFS__OP__BEGIN; + rmsg.has_op = true; + rmsg.transaction_id = txn->xid; + rmsg.has_transaction_id = true; + rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); + rmsg.has_commit_time = true; + + /* write msg */ + OutputPluginPrepareWrite(ctx, true); + if (data->debug_mode) { + print_row_msg(ctx->out, &rmsg); + } else { + size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); + void *packed = palloc(psize); + size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); + appendBinaryStringInfo(ctx->out, packed, ssize); + } + OutputPluginWrite(ctx, true); + + /* Cleanup, freeing memory */ + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + +/* COMMIT callback */ +static void pg_decode_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + + DecoderData *data; + MemoryContext old; + Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; + elog(DEBUG1, "Entering commit callback"); + + + /* Avoid leaking memory by using and resetting our own context */ + data = ctx->output_plugin_private; + old = MemoryContextSwitchTo(data->context); + + rmsg.op = DECODERBUFS__OP__COMMIT; + rmsg.has_op = true; + rmsg.transaction_id = txn->xid; + rmsg.has_transaction_id = true; + rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); + rmsg.has_commit_time = true; + + /* write msg */ + OutputPluginPrepareWrite(ctx, true); + if (data->debug_mode) { + print_row_msg(ctx->out, &rmsg); + } else { + size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); + void *packed = palloc(psize); + size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); + appendBinaryStringInfo(ctx->out, packed, ssize); + } + OutputPluginWrite(ctx, true); + + /* Cleanup, freeing memory */ + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index 7f85065..058290e 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -565,17 +565,21 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = (ProtobufCMessageInit) decoderbufs__row_message__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = +static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[5] = { { "INSERT", "DECODERBUFS__OP__INSERT", 0 }, { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, { "DELETE", "DECODERBUFS__OP__DELETE", 2 }, + { "BEGIN", "DECODERBUFS__OP__BEGIN", 3 }, + { "COMMIT", "DECODERBUFS__OP__COMMIT", 4 }, }; static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { -{0, 0},{0, 3} +{0, 0},{0, 5} }; -static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] = +static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[5] = { + { "BEGIN", 3 }, + { "COMMIT", 4 }, { "DELETE", 2 }, { "INSERT", 0 }, { "UPDATE", 1 }, @@ -587,9 +591,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor = "Op", "Decoderbufs__Op", "decoderbufs", - 3, + 5, decoderbufs__op__enum_values_by_number, - 3, + 5, decoderbufs__op__enum_values_by_name, 1, decoderbufs__op__value_ranges, diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index 7082bdd..cfeb895 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -26,7 +26,9 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; typedef enum _Decoderbufs__Op { DECODERBUFS__OP__INSERT = 0, DECODERBUFS__OP__UPDATE = 1, - DECODERBUFS__OP__DELETE = 2 + DECODERBUFS__OP__DELETE = 2, + DECODERBUFS__OP__BEGIN = 3, + DECODERBUFS__OP__COMMIT = 4 PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP) } Decoderbufs__Op;