The call to MemoryContextReset frees all allocated memory, which is both more
efficient and more reliable than individually freeing each allocated object.pull/8/head
parent
e248eda6a1
commit
c6c48f8c16
|
@ -191,76 +191,6 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* convenience method to free up sub-messages */
|
|
||||||
static void row_message_destroy(Decoderbufs__RowMessage *msg) {
|
|
||||||
if (!msg) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg->table) {
|
|
||||||
pfree(msg->table);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg->n_new_tuple > 0) {
|
|
||||||
for (int i = 0; i < msg->n_new_tuple; i++) {
|
|
||||||
if (msg->new_tuple[i]) {
|
|
||||||
switch (msg->new_tuple[i]->datum_case) {
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
|
|
||||||
if (msg->new_tuple[i]->datum_string) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_string);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
|
|
||||||
if (msg->new_tuple[i]->datum_bytes.data) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_bytes.data);
|
|
||||||
msg->new_tuple[i]->datum_bytes.data = NULL;
|
|
||||||
msg->new_tuple[i]->datum_bytes.len = 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
|
|
||||||
if (msg->new_tuple[i]->datum_point) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_point);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pfree(msg->new_tuple[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pfree(msg->new_tuple);
|
|
||||||
}
|
|
||||||
if (msg->n_old_tuple > 0) {
|
|
||||||
for (int i = 0; i < msg->n_old_tuple; i++) {
|
|
||||||
if (msg->old_tuple[i]) {
|
|
||||||
switch (msg->old_tuple[i]->datum_case) {
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
|
|
||||||
if (msg->old_tuple[i]->datum_string) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_string);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
|
|
||||||
if (msg->old_tuple[i]->datum_bytes.data) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_bytes.data);
|
|
||||||
msg->old_tuple[i]->datum_bytes.data = NULL;
|
|
||||||
msg->old_tuple[i]->datum_bytes.len = 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
|
|
||||||
if (msg->old_tuple[i]->datum_point) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_point);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pfree(msg->old_tuple[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pfree(msg->old_tuple);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* print tuple datums (only used for debug-mode) */
|
/* print tuple datums (only used for debug-mode) */
|
||||||
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
||||||
size_t n) {
|
size_t n) {
|
||||||
|
@ -359,8 +289,6 @@ static double numeric_to_double_no_overflow(Numeric num) {
|
||||||
tmp)));
|
tmp)));
|
||||||
}
|
}
|
||||||
|
|
||||||
pfree(tmp);
|
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -632,17 +560,16 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
TupleDesc tupdesc;
|
TupleDesc tupdesc;
|
||||||
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
|
||||||
elog(DEBUG1, "Entering decode_change callback");
|
elog(DEBUG1, "Entering decode_change callback");
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
replident = relation->rd_rel->relreplident;
|
replident = relation->rd_rel->relreplident;
|
||||||
|
|
||||||
class_form = RelationGetForm(relation);
|
class_form = RelationGetForm(relation);
|
||||||
|
|
||||||
data = ctx->output_plugin_private;
|
|
||||||
|
|
||||||
/* Avoid leaking memory by using and resetting our own context */
|
|
||||||
old = MemoryContextSwitchTo(data->context);
|
|
||||||
|
|
||||||
RelationGetIndexList(relation);
|
RelationGetIndexList(relation);
|
||||||
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
|
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
|
||||||
(replident == REPLICA_IDENTITY_DEFAULT &&
|
(replident == REPLICA_IDENTITY_DEFAULT &&
|
||||||
|
@ -732,14 +659,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
void *packed = palloc(psize);
|
void *packed = palloc(psize);
|
||||||
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
appendBinaryStringInfo(ctx->out, packed, ssize);
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
/* free packed buffer */
|
|
||||||
pfree(packed);
|
|
||||||
}
|
}
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
/* cleanup msg */
|
/* Cleanup, freeing memory */
|
||||||
row_message_destroy(&rmsg);
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(old);
|
MemoryContextSwitchTo(old);
|
||||||
MemoryContextReset(data->context);
|
MemoryContextReset(data->context);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue