diff --git a/src/decoderbufs.c b/src/decoderbufs.c index ea8859f..7e72998 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -195,76 +195,6 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { } -/* convenience method to free up sub-messages */ -static void row_message_destroy(Decoderbufs__RowMessage *msg) { - if (!msg) { - return; - } - - if (msg->table) { - pfree(msg->table); - } - - if (msg->n_new_tuple > 0) { - for (int i = 0; i < msg->n_new_tuple; i++) { - if (msg->new_tuple[i]) { - switch (msg->new_tuple[i]->datum_case) { - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING: - if (msg->new_tuple[i]->datum_string) { - pfree(msg->new_tuple[i]->datum_string); - } - break; - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES: - if (msg->new_tuple[i]->datum_bytes.data) { - pfree(msg->new_tuple[i]->datum_bytes.data); - msg->new_tuple[i]->datum_bytes.data = NULL; - msg->new_tuple[i]->datum_bytes.len = 0; - } - break; - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT: - if (msg->new_tuple[i]->datum_point) { - pfree(msg->new_tuple[i]->datum_point); - } - break; - default: - break; - } - pfree(msg->new_tuple[i]); - } - } - pfree(msg->new_tuple); - } - if (msg->n_old_tuple > 0) { - for (int i = 0; i < msg->n_old_tuple; i++) { - if (msg->old_tuple[i]) { - switch (msg->old_tuple[i]->datum_case) { - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING: - if (msg->old_tuple[i]->datum_string) { - pfree(msg->old_tuple[i]->datum_string); - } - break; - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES: - if (msg->old_tuple[i]->datum_bytes.data) { - pfree(msg->old_tuple[i]->datum_bytes.data); - msg->old_tuple[i]->datum_bytes.data = NULL; - msg->old_tuple[i]->datum_bytes.len = 0; - } - break; - case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT: - if (msg->old_tuple[i]->datum_point) { - pfree(msg->old_tuple[i]->datum_point); - } - break; - default: - break; - } - pfree(msg->old_tuple[i]); - } - } - pfree(msg->old_tuple); - } -} - /* print tuple datums (only used for debug-mode) */ static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { @@ -363,8 +293,6 @@ static double numeric_to_double_no_overflow(Numeric num) { tmp))); } - pfree(tmp); - return val; } @@ -676,17 +604,16 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc tupdesc; Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; - elog(DEBUG1, "Entering decode_change callback"); + elog(DEBUG1, "Entering decode_change callback"); + + /* Avoid leaking memory by using and resetting our own context */ + data = ctx->output_plugin_private; + old = MemoryContextSwitchTo(data->context); replident = relation->rd_rel->relreplident; class_form = RelationGetForm(relation); - data = ctx->output_plugin_private; - - /* Avoid leaking memory by using and resetting our own context */ - old = MemoryContextSwitchTo(data->context); - RelationGetIndexList(relation); is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING || (replident == REPLICA_IDENTITY_DEFAULT && @@ -790,14 +717,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, void *packed = palloc(psize); size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); appendBinaryStringInfo(ctx->out, packed, ssize); - /* free packed buffer */ - pfree(packed); } OutputPluginWrite(ctx, true); - /* cleanup msg */ - row_message_destroy(&rmsg); - + /* Cleanup, freeing memory */ MemoryContextSwitchTo(old); MemoryContextReset(data->context); }