diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 4cf9f5a..8669c1e 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -111,11 +111,11 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { /* initialize this plugin */ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - elog(DEBUG1, "Entering startup callback"); - ListCell *option; DecoderData *data; + elog(DEBUG1, "Entering startup callback"); + data = palloc(sizeof(DecoderData)); data->context = AllocSetContextCreate( ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE, @@ -157,8 +157,11 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, /* cleanup this plugin's resources */ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { + DecoderData *data; + elog(DEBUG1, "Entering decode_shutdown callback"); - DecoderData *data = ctx->output_plugin_private; + + data = ctx->output_plugin_private; /* cleanup our own resources via memory context reset */ MemoryContextDelete(data->context); @@ -399,10 +402,11 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, bytea *valptr = NULL; const char *output = NULL; Point *p = NULL; - Timestamp ts = NULL; + Timestamp ts; double duration; TimeTzADT *timetz = NULL; Interval *interval = NULL; + Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; int size = 0; switch (typid) { @@ -510,7 +514,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, break; case POINTOID: p = DatumGetPointP(datum); - Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; dp.x = p->x; dp.y = p->y; datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); @@ -525,9 +528,10 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, geography_point_as_decoderbufs_point(datum, datum_msg->datum_point); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT; } else { + int len; elog(WARNING, "Encountered unknown typid: %d, using bytes", typid); output = OidOutputFunctionCall(typoutput, datum); - int len = strlen(output); + len = strlen(output); size = sizeof(char) * len; datum_msg->datum_bytes.data = palloc(size); memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); @@ -566,9 +570,11 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Datum origval; bool isnull; const char *attrName; + Oid typoutput; + bool typisvarlena; + Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; attr = tupdesc->attrs[natt]; - Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; /* skip dropped columns and system columns */ if (attr->attisdropped || attr->attnum < 0) { @@ -580,7 +586,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName); /* set the column name */ - datum_msg.column_name = attrName; + datum_msg.column_name = (char *)attrName; /* set datum from tuple */ origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); @@ -589,8 +595,6 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, datum_msg.column_type = attr->atttypid; datum_msg.has_column_type = true; - Oid typoutput; - bool typisvarlena; /* query output function */ getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); if (!isnull) { @@ -615,15 +619,20 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, /* callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { - elog(DEBUG1, "Entering decode_change callback"); DecoderData *data; MemoryContext old; Form_pg_class class_form; - char replident = relation->rd_rel->relreplident; + char replident; bool is_rel_non_selective; - const char *selectiveInfo = is_rel_non_selective ? "non selective" : "selective"; + const char *selectiveInfo; + TupleDesc tupdesc; Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; + + elog(DEBUG1, "Entering decode_change callback"); + + replident = relation->rd_rel->relreplident; + class_form = RelationGetForm(relation); data = ctx->output_plugin_private; @@ -635,6 +644,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING || (replident == REPLICA_IDENTITY_DEFAULT && !OidIsValid(relation->rd_replidindex))); + selectiveInfo = is_rel_non_selective ? "non selective" : "selective"; /* set common fields */ rmsg.transaction_id = txn->xid; @@ -653,7 +663,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, rmsg.has_op = true; if (change->data.tp.newtuple != NULL) { elog(DEBUG1, "decoding new tuple information"); - TupleDesc tupdesc = RelationGetDescr(relation); + tupdesc = RelationGetDescr(relation); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); @@ -668,7 +678,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_rel_non_selective) { if (change->data.tp.oldtuple != NULL) { elog(DEBUG1, "decoding old tuple information"); - TupleDesc tupdesc = RelationGetDescr(relation); + tupdesc = RelationGetDescr(relation); rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); @@ -677,7 +687,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } if (change->data.tp.newtuple != NULL) { elog(DEBUG1, "decoding new tuple information"); - TupleDesc tupdesc = RelationGetDescr(relation); + tupdesc = RelationGetDescr(relation); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); @@ -693,7 +703,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* if there was no PK, we only know that a delete happened */ if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) { elog(DEBUG1, "decoding old tuple information"); - TupleDesc tupdesc = RelationGetDescr(relation); + tupdesc = RelationGetDescr(relation); rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);