diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 3482d66..4cf9f5a 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -111,7 +111,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { /* initialize this plugin */ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - elog(INFO, "Entering startup callback"); + elog(DEBUG1, "Entering startup callback"); ListCell *option; DecoderData *data; @@ -157,6 +157,7 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, /* cleanup this plugin's resources */ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { + elog(DEBUG1, "Entering decode_shutdown callback"); DecoderData *data = ctx->output_plugin_private; /* cleanup our own resources via memory context reset */ @@ -537,32 +538,52 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } } +/* return the number of valid attributes from the tuple */ +static int valid_attributes_count_from(TupleDesc tupdesc) { + int natt; + int count = 0; + for (natt = 0; natt < tupdesc->natts; natt++) { + Form_pg_attribute attr = tupdesc->attrs[natt]; + + /* skip dropped columns and system columns */ + if (attr->attisdropped || attr->attnum < 0) { + continue; + } + count++; + } + return count; +} + /* convert a PG tuple to an array of DatumMessage(s) */ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relation, HeapTuple tuple, TupleDesc tupdesc) { int natt; - + elog(DEBUG1, "processing tuple with %d columns", tupdesc->natts); /* build column names and values */ for (natt = 0; natt < tupdesc->natts; natt++) { Form_pg_attribute attr; Datum origval; bool isnull; + const char *attrName; attr = tupdesc->attrs[natt]; - + Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; + /* skip dropped columns and system columns */ if (attr->attisdropped || attr->attnum < 0) { + elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column"); continue; - } + } - Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; + attrName = quote_identifier(NameStr(attr->attname)); + elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName); /* set the column name */ - datum_msg.column_name = quote_identifier(NameStr(attr->attname)); - + datum_msg.column_name = attrName; + /* set datum from tuple */ - origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); + origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); /* get output function */ datum_msg.column_type = attr->atttypid; @@ -582,6 +603,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval)); set_datum_value(&datum_msg, attr->atttypid, typoutput, val); } + } else { + elog(DEBUG1, "column %s is null, ignoring value", attrName); } tmsg[natt] = palloc(sizeof(datum_msg)); @@ -592,12 +615,14 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, /* callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { + elog(DEBUG1, "Entering decode_change callback"); DecoderData *data; MemoryContext old; Form_pg_class class_form; char replident = relation->rd_rel->relreplident; bool is_rel_non_selective; + const char *selectiveInfo = is_rel_non_selective ? "non selective" : "selective"; Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; class_form = RelationGetForm(relation); @@ -616,18 +641,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, rmsg.has_transaction_id = true; rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.has_commit_time = true; - rmsg.table = pstrdup(NameStr(class_form->relname)); - + rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + NameStr(class_form->relname))); + + /* decode different operation types */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: + elog(DEBUG1, "decoding INSERT for table %s; relation is %s", rmsg.table, selectiveInfo); rmsg.op = DECODERBUFS__OP__INSERT; rmsg.has_op = true; if (change->data.tp.newtuple != NULL) { + elog(DEBUG1, "decoding new tuple information"); TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_new_tuple = tupdesc->natts; + rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = - palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); } @@ -635,20 +664,23 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: rmsg.op = DECODERBUFS__OP__UPDATE; rmsg.has_op = true; + elog(DEBUG1, "decoding UPDATE for table %s; relation is %s", rmsg.table, selectiveInfo); if (!is_rel_non_selective) { if (change->data.tp.oldtuple != NULL) { + elog(DEBUG1, "decoding old tuple information"); TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_old_tuple = tupdesc->natts; + rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.old_tuple = - palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); } if (change->data.tp.newtuple != NULL) { + elog(DEBUG1, "decoding new tuple information"); TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_new_tuple = tupdesc->natts; + rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = - palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); } @@ -657,17 +689,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: rmsg.op = DECODERBUFS__OP__DELETE; rmsg.has_op = true; + elog(DEBUG1, "decoding DELETE for table %s; relation is %s", rmsg.table, selectiveInfo); /* if there was no PK, we only know that a delete happened */ if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) { + elog(DEBUG1, "decoding old tuple information"); TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_old_tuple = tupdesc->natts; + rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.old_tuple = - palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); + } else { + elog(WARNING, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid "); } break; default: + elog(WARNING, "unknown change action"); Assert(0); break; }